char *fwu_path;
};
+/* Check if we have C11 atomics available */
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L && !defined(__STDC_NO_ATOMICS__)
+ #include <stdatomic.h>
+ #define HAS_STDATOMIC 1
+#else
+ #define HAS_STDATOMIC 0
+#endif
+
+/* Define atomic fetch and add operation */
+#if HAS_STDATOMIC
+ #define ll_atomic_fetch_add(ptr, val) atomic_fetch_add(ptr, val)
+ #define ll_atomic_fetch_sub(ptr, val) atomic_fetch_sub(ptr, val)
+#else
+ #define ll_atomic_fetch_add(ptr, val) __sync_fetch_and_add(ptr, val)
+ #define ll_atomic_fetch_sub(ptr, val) __sync_fetch_and_sub(ptr, val)
+#endif
+
/* Work queue for managing parallel processing */
struct find_work_queue {
struct find_work_unit *fwq_head; /* Take work from head */
struct find_work_unit *fwq_tail; /* ... add to tail */
pthread_mutex_t fwq_lock;
pthread_cond_t fwq_sleep_cond;
- int fwq_active_units; /* Atomic counter, active work units */
+ /* Atomic counter, active work units */
+#if HAS_STDATOMIC
+ atomic_int fwq_active_units;
+#else
+ int fwq_active_units;
+#endif
bool fwq_shutdown; /* Flag to signal shutdown */
int fwq_error;
};
return magic == LMV_MAGIC_FOREIGN;
}
-static void find_param_fini(struct find_param *param)
+void find_param_fini(struct find_param *param)
{
if (param->fp_migrate)
return;
}
}
-static int common_param_init(struct find_param *param, char *path)
+int common_param_init(struct find_param *param, char *path)
{
int lum_size = get_mds_md_size(path);
if (!buf)
return -ENOMEM;
- snprintf(buf, PATH_MAX + 1, "%s", path);
+ ret = snprintf(buf, PATH_MAX + 1, "%s", path);
+ if (ret < 0 || ret >= PATH_MAX + 1) {
+ ret = -ENAMETOOLONG;
+ goto out;
+ }
ret = common_param_init(param, buf);
if (ret)
goto out;
param->fp_depth = 0;
- ret = llapi_semantic_traverse(buf, 2 * PATH_MAX, -1, sem_init,
+ ret = llapi_semantic_traverse(buf, 2 * PATH_MAX + 1, -1, sem_init,
sem_fini, param, NULL);
out:
find_param_fini(param);
#include "lustreapi_internal.h"
#include "lstddef.h"
+
+
+static void work_unit_free(struct find_work_unit *unit);
+
/* Placeholder worker function - just for testing thread creation */
static void *find_worker(void *arg)
{
struct find_work_queue *queue = (struct find_work_queue *) arg;
+ struct find_work_unit *unit;
/* TODO: Implement actual work processing */
- while (!queue->fwq_shutdown)
- sleep(1); /* Just keep thread alive for now */
+ while (!queue->fwq_shutdown) {
+ /* Get work unit from queue */
+ pthread_mutex_lock(&queue->fwq_lock);
+ while (queue->fwq_head == NULL && !queue->fwq_shutdown) {
+ pthread_cond_wait(&queue->fwq_sleep_cond,
+ &queue->fwq_lock);
+ }
+
+ if (queue->fwq_shutdown) {
+ pthread_mutex_unlock(&queue->fwq_lock);
+ break;
+ }
+
+ /* Dequeue work unit */
+ unit = queue->fwq_head;
+
+ queue->fwq_head = unit->fwu_next;
+ if (queue->fwq_head == NULL)
+ queue->fwq_tail = NULL;
+ pthread_mutex_unlock(&queue->fwq_lock);
+
+ /* TODO: processing goes here */
+ sleep(0.1);
+
+ work_unit_free(unit);
+ __sync_fetch_and_sub(&queue->fwq_active_units, 1);
+ }
return NULL;
}
return 0;
}
+/* Free a work unit */
+static void work_unit_free(struct find_work_unit *unit)
+{
+ if (!unit)
+ return;
+
+ free(unit->fwu_path);
+ free(unit->fwu_de);
+ free(unit);
+}
+
+/* Create a new work unit */
+static struct find_work_unit *work_unit_create(const char *path,
+ struct find_param *param,
+ struct dirent64 *de)
+{
+ struct find_work_unit *unit;
+
+ unit = malloc(sizeof(*unit));
+ if (!unit)
+ return NULL;
+
+ /* Initialize with zeros to ensure clean error handling */
+ memset(unit, 0, sizeof(*unit));
+
+ /* Copy the path */
+ unit->fwu_path = (char *)malloc(PATH_MAX + 1);
+ if (!unit->fwu_path)
+ goto error;
+ snprintf(unit->fwu_path, PATH_MAX + 1, "%s", path);
+
+ /* Copy the directory entry if provided */
+ if (de) {
+ unit->fwu_de = malloc(sizeof(*de));
+ if (!unit->fwu_de)
+ goto error;
+ memcpy(unit->fwu_de, de, sizeof(*de));
+ }
+
+ /* TODO: deep copy of param
+ unit->fwu_param = param; */
+
+ return unit;
+
+error:
+ work_unit_free(unit);
+ return NULL;
+}
+
+static int work_unit_create_and_add(const char *path, struct find_param *param,
+ struct dirent64 *dent)
+{
+ struct find_work_queue *queue = param->fp_queue;
+ struct find_work_unit *unit;
+ int rc = 0;
+
+ unit = work_unit_create(path, param, dent);
+ if (!unit) {
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ ll_atomic_fetch_add(&queue->fwq_active_units, 1);
+
+ pthread_mutex_lock(&queue->fwq_lock);
+
+ /* add to queue, at tail if there's already something on the queue */
+ if (queue->fwq_tail) {
+ queue->fwq_tail->fwu_next = unit;
+ } else {
+ queue->fwq_head = unit;
+ }
+ queue->fwq_tail = unit;
+
+ /* wake up any waiting workers */
+ pthread_cond_signal(&queue->fwq_sleep_cond);
+ pthread_mutex_unlock(&queue->fwq_lock);
+
+out:
+ return rc;
+}
+
+static int pfind_param_callback(char *path, struct find_param *param,
+ struct find_work_queue *queue)
+{
+ char *buf;
+ int ret;
+
+ if (strlen(path) > PATH_MAX) {
+ ret = -EINVAL;
+ llapi_error(LLAPI_MSG_ERROR, ret,
+ "Path name '%s' is too long", path);
+ return ret;
+ }
+
+ buf = (char *)malloc(PATH_MAX + 1);
+ if (!buf)
+ return -ENOMEM;
+
+ snprintf(buf, PATH_MAX + 1, "%s", path);
+ ret = common_param_init(param, buf);
+ if (ret)
+ goto out;
+
+ param->fp_queue = queue;
+ ret = work_unit_create_and_add(buf, param, NULL);
+ if (ret)
+ goto out;
+
+ /* Wait for all work to complete */
+ while (ll_atomic_fetch_add(&queue->fwq_active_units, 0) > 0)
+ sched_yield();
+
+out:
+ find_param_fini(param);
+ free(buf);
+ return ret < 0 ? ret : 0;
+}
+
int parallel_find(char *path, struct find_param *param)
{
struct find_work_queue queue;
goto cleanup;
}
/* Normal find - no parallelism yet */
- rc = param_callback(path, cb_find_init, cb_common_fini, param);
+ rc = pfind_param_callback(path, param, &queue);
+
/* Signal shutdown and wait for threads before cleanup */
queue.fwq_shutdown = true;