#include <errno.h>
#include <fcntl.h>
#include <getopt.h>
+#include <libgen.h>
#include <linux/lustre/lustre_idl.h>
#include <lustre/lustreapi.h>
+#include <lustre/lustre_user.h>
+#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "debug.h"
+#include "lipe_object_attrs.h"
+#include "policy.h"
#define DEF_HIGH_USAGE 90
#define DEF_LOW_USAGE 75
#define DEF_INTERVAL 5
#define DEF_SCAN_THREADS 1
+#define DEF_CANDIDATE_NUM 2000
+#define MAX_CANDIDATE_NUM 100000
+#define MAX_ATIME_FID_LEN 128
+#define OPT_DRY_RUN 1
+#define OPT_CANDIDATE_NUM 2
+#define OPT_FORCE_CLEAR 3
#define OPT_LOG_LEVEL 4
struct lpcc_purge_options {
int o_interval;
int o_scan_threads;
+
+ int o_candidate_num;
+ bool o_dry_run;
+ bool o_force_clear;
};
static struct lpcc_purge_options opt = {
.o_rwid = -1,
.o_low_usage = DEF_LOW_USAGE,
.o_interval = DEF_INTERVAL,
.o_scan_threads = DEF_SCAN_THREADS,
+ .o_candidate_num = DEF_CANDIDATE_NUM,
+ .o_dry_run = false,
+ .o_force_clear = false,
};
bool exit_flag = false;
struct lpcc_purge_stats {
double s_start_usage;
+ pthread_mutex_t s_lock;
};
+
static struct lpcc_purge_stats stats = {
.s_start_usage = -1,
+ .s_lock = PTHREAD_MUTEX_INITIALIZER,
+};
+
+static struct lipe_instance *instance = NULL;
+
+struct lpcc_purge_candidate {
+ uint64_t c_atime_ms;
+ struct lu_fid c_fid;
+ char c_path[0];
};
+struct lpcc_purge_candidate_set {
+ struct lpcc_purge_candidate ** cs_arr;
+ int cs_capacity; /* the total capacity of array */
+ int cs_count; /* the number of valid elements in array */
+ pthread_mutex_t cs_lock;
+};
+static struct lpcc_purge_candidate_set candidate_set;
+
+struct lpcc_purge_candidate *lpcc_purge_candidate_new(
+ uint64_t atime_ms, struct lu_fid fid, const char *path)
+{
+ struct lpcc_purge_candidate *candidate =
+ (struct lpcc_purge_candidate *)malloc(
+ sizeof(struct lpcc_purge_candidate) + strlen(path) + 1);
+
+ if (candidate == NULL) {
+ llapi_error(LLAPI_MSG_FATAL, errno,
+ "cannot allocate memory for candidate!");
+ exit(1);
+ }
+
+ candidate->c_atime_ms = atime_ms;
+ candidate->c_fid = fid;
+ strcpy(candidate->c_path, path);
+
+ return candidate;
+}
+
+void lpcc_purge_candidate_destroy(struct lpcc_purge_candidate *candidate)
+{
+ free(candidate);
+}
+
+static int _compare_candidate(const void *p1, const void *p2)
+{
+ const struct lpcc_purge_candidate *c1 = *(const struct lpcc_purge_candidate **)(p1);
+ const struct lpcc_purge_candidate *c2 = *(const struct lpcc_purge_candidate **)(p2);
+
+ if (c1->c_atime_ms < c2->c_atime_ms)
+ return -1;
+ if (c1->c_atime_ms > c2->c_atime_ms)
+ return 1;
+ return 0;
+}
+
+static void lpcc_purge_candidate_set_init(int capacity)
+{
+ candidate_set.cs_arr = calloc(capacity, sizeof(candidate_set.cs_arr[0]));
+ if (candidate_set.cs_arr == NULL) {
+ llapi_error(LLAPI_MSG_FATAL, errno,
+ "cannot allocate memory for candidate set!");
+ exit(1);
+ }
+ candidate_set.cs_capacity = capacity;
+ candidate_set.cs_count = 0;
+ pthread_mutex_init(&candidate_set.cs_lock, NULL);
+}
+
+static void lpcc_purge_candidate_set_destroy(void)
+{
+ int i;
+ for (i = 0; i < candidate_set.cs_count; i++)
+ lpcc_purge_candidate_destroy(candidate_set.cs_arr[i]);
+
+ free(candidate_set.cs_arr);
+}
+
+static void lpcc_purge_candidate_set_clear(void)
+{
+ int i;
+ for (i = 0; i < candidate_set.cs_count; i++)
+ lpcc_purge_candidate_destroy(candidate_set.cs_arr[i]);
+
+ candidate_set.cs_count = 0;
+}
+
+static void lpcc_purge_candidate_set_append(
+ struct lpcc_purge_candidate *candidate)
+{
+ if (candidate_set.cs_count >= candidate_set.cs_capacity) {
+ llapi_error(LLAPI_MSG_FATAL, EFAULT,
+ "Out of boundary of array");
+ exit(1);
+ }
+
+ candidate_set.cs_arr[candidate_set.cs_count++] = candidate;
+}
+
+void lpcc_purge_candidate_set_sort(void)
+{
+ qsort(candidate_set.cs_arr, candidate_set.cs_count,
+ sizeof(candidate_set.cs_arr[0]), &_compare_candidate);
+}
+
static void lpcc_purge_null_handler(int signal)
{
}
static void lpcc_purge_sigint_handler(int signal)
{
psignal(signal, "exiting");
-
- exit_flag = true;
+ _exit(0);
}
static void usage(void)
{
printf("Usage: %s [options]\n"
"\t-b, --debug, enable debugging output\n"
- "\t --log_level={debug|info|normal|warn|error|fatal|off}, set log level (default: info)\n"
+ "\t --log-level={debug|info|normal|warn|error|fatal|off}, set log level (default: info)\n"
"\t-f, --config-file=FILE\n"
"\t-C, --cache=DIR root directory of PCC\n"
"\t-M, --mount=DIR local Lustre client's mountpoint\n"
"\t-A, --rwid=NUM the rwid of PCC\n"
- "\t-H, --high_usage=NUM %% of space or inode to start purging (default: %u)\n"
- "\t-L, --low_usage=NUM %% of space or inode to stop purging (default: %u)\n"
+ "\t-H, --high-usage=NUM %% of space or inode to start purging (default: %u)\n"
+ "\t-L, --low-usage=NUM %% of space or inode to stop purging (default: %u)\n"
"\t-i, --interval=NUM, seconds to next check (default: %u)\n"
- "\t-t, --scan_threads=NUM scanning threads (default: %u)\n"
+ "\t-t, --scan-threads=NUM scanning threads (default: %u)\n"
+ "\t --candidate-num=NUM, candidate number of approximate LRU (default: %d)\n"
+ "\t --dry-run, scan once but don't detach file really\n"
"\t-h, --help, print this help message\n",
program_invocation_short_name,
DEF_HIGH_USAGE,
DEF_LOW_USAGE,
DEF_INTERVAL,
- DEF_SCAN_THREADS
+ DEF_SCAN_THREADS,
+ DEF_CANDIDATE_NUM
);
}
static struct option long_options[] = {
{ "debug", no_argument, NULL, 'b'},
{ "config-file", required_argument, NULL, 'f'},
- { "log_level", required_argument, NULL, OPT_LOG_LEVEL},
+ { "log-level", required_argument, NULL, OPT_LOG_LEVEL},
{ "cache", required_argument, NULL, 'C'},
{ "mount", required_argument, NULL, 'M'},
{ "rwid", required_argument, NULL, 'A'},
- { "high_usage", required_argument, NULL, 'H'},
- { "low_usage", required_argument, NULL, 'L'},
+ { "high-usage", required_argument, NULL, 'H'},
+ { "low-usage", required_argument, NULL, 'L'},
{ "interval", required_argument, NULL, 'i'},
- { "scan_threads", required_argument, NULL, 't'},
+ { "scan-threads", required_argument, NULL, 't'},
+ { "dry-run", no_argument, NULL, OPT_DRY_RUN},
+ { "candidate-num", required_argument, NULL, OPT_CANDIDATE_NUM},
+ { "force-clear", no_argument, NULL, OPT_FORCE_CLEAR},
{ "help", no_argument, NULL, 'h' },
{ NULL }
};
static struct option *lpcc_purge_keyword_lookup(const char *keyword)
{
- int i = 0;
+ int i;
+ char keyword2[PATH_MAX];
+
+ for (i = 0; i <= strlen(keyword) && i < PATH_MAX; i++) {
+ if (keyword[i] == '_')
+ keyword2[i] = '-';
+ else
+ keyword2[i] = keyword[i];
+ }
+ i = 0;
while (long_options[i].name) {
- if (strcmp(keyword, long_options[i].name) == 0)
+ if (strcmp(keyword2, long_options[i].name) == 0)
return long_options + i;
i++;
}
break;
case OPT_LOG_LEVEL:
log_level = parse_log_level(optarg);
- llapi_printf(LLAPI_MSG_INFO, "Set log level: %s\n",
+ llapi_printf(LLAPI_MSG_INFO, "set log level: %s\n",
log_level_strs[log_level]);
llapi_msg_set_level(log_level);
}
opt.o_scan_threads = value;
break;
+ case OPT_DRY_RUN:
+ opt.o_dry_run = true;
+ break;
+ case OPT_CANDIDATE_NUM:
+ value = strtol(optarg, &endptr, 10);
+ if (*endptr != '\0' || value < 100 || value > MAX_CANDIDATE_NUM) {
+ llapi_error(LLAPI_MSG_FATAL, -EINVAL,
+ "invalid candidate number: '%s'",
+ optarg);
+ exit(1);
+ }
+ opt.o_candidate_num = value;
+ break;
+ case OPT_FORCE_CLEAR:
+ opt.o_force_clear = true;
+ break;
default:
llapi_error(LLAPI_MSG_FATAL, -EINVAL,
"invalid argument: '%s'",
static double lpcc_purge_get_fs_usage(const char *fs)
{
int rc;
+ double usage, i_usage;
struct statfs statfs_buf;
rc = statfs(opt.o_cache, &statfs_buf);
exit(1);
}
- return 100.0 * (statfs_buf.f_blocks - statfs_buf.f_bfree) / statfs_buf.f_blocks;
+ usage = 100.0 * (statfs_buf.f_blocks - statfs_buf.f_bfree) / statfs_buf.f_blocks;
+ i_usage = 100.0 * (statfs_buf.f_files - statfs_buf.f_ffree) / statfs_buf.f_files;
+
+ return (usage > i_usage) ? usage : i_usage;
}
static void lpcc_purge_wait_for_scan(void)
{
- while (!exit_flag) {
+ while (1) {
double usage = lpcc_purge_get_fs_usage(opt.o_cache);
- if (usage >= opt.o_high_usage) {
+ llapi_printf(LLAPI_MSG_DEBUG, "usage: %.1f\n", usage);
+ if (usage >= opt.o_high_usage || opt.o_dry_run) {
stats.s_start_usage = usage;
break;
} else {
}
}
+/**
+ * This is a temporary function to remove the cache file and empty hash dir
+ * recursively up to the cache dir.
+ *
+ * The function is only for testing, it's not safe in production enviroment.
+ */
+static int remove_hash_dir(const char *path)
+{
+ int rc;
+ char dir[PATH_MAX], tmp[PATH_MAX];
+
+ unlink(path);
+
+ strcpy(tmp, path);
+ strcpy(dir, dirname(tmp));
+
+ while (strlen(dir) > strlen(opt.o_cache)) {
+ rc = rmdir(dir);
+ if (rc) {
+ if (errno == ENOTEMPTY || errno == EBUSY) {
+ rc = 0;
+ }
+ else {
+ rc = -errno;
+ llapi_printf(LLAPI_MSG_WARN,
+ "cannot remove hash dir: %s. errno = %d\n",
+ dir, -rc);
+ }
+ break;
+ }
+ else {
+ strcpy(tmp, dir);
+ strcpy(dir, dirname(tmp));
+ }
+ }
+
+ return rc;
+}
+
+static int lpcc_purge_detach_candidate(const char *mnt,
+ const struct lpcc_purge_candidate *candidate)
+{
+ int rc;
+
+ llapi_printf(LLAPI_MSG_DEBUG, "detach fid: "DFID"\n", PFID(&candidate->c_fid));
+
+ if (opt.o_dry_run) {
+ return -1;
+ }
+
+ rc = llapi_pcc_detach_fid(mnt, &candidate->c_fid, PCC_DETACH_FL_UNCACHE);
+ if (rc) {
+ llapi_error(LLAPI_MSG_WARN, -rc, "cannot detach fid: "DFID,
+ PFID(&candidate->c_fid));
+ return rc;
+ }
+
+ if (opt.o_force_clear)
+ remove_hash_dir(candidate->c_path);
+
+ return 0;
+}
+
+static int lpcc_purge_scan_callback(struct lipe_instance *instance,
+ struct lipe_object *object,
+ struct lipe_object_attrs *attrs)
+{
+ int rc;
+ struct lu_fid fid;
+ char *path = NULL;
+ struct lpcc_purge_candidate *candidate = NULL;
+
+ if (!S_ISREG(attrs->loa_mode)) {
+ // skip non-regular file
+ return 0;
+ }
+
+ path = strdup(object->u.lo_posix.lop_path);
+ if (path == NULL) {
+ rc = -errno;
+ goto out;
+ }
+
+ const char * name = basename(path);
+ rc = sscanf(name, SFID, RFID(&fid));
+ if (rc != 3) {
+ // Not an valid fid string, skip it
+ rc = 0;
+ goto out;
+ }
+
+ candidate = lpcc_purge_candidate_new(attrs->loa_atime_ms, fid, object->u.lo_posix.lop_path);
+ if (candidate == NULL) {
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ llapi_printf(LLAPI_MSG_DEBUG, "append fid: "DFID"\n", PFID(&fid));
+ pthread_mutex_lock(&candidate_set.cs_lock);
+ lpcc_purge_candidate_set_append(candidate);
+ candidate = NULL;
+
+ // check whether the pool is full
+ if (candidate_set.cs_count < candidate_set.cs_capacity) {
+ pthread_mutex_unlock(&candidate_set.cs_lock);
+ rc = 0;
+ goto out;
+ }
+
+ // pool is full
+ llapi_printf(LLAPI_MSG_DEBUG, "candidate set is full, sort it first\n");
+ lpcc_purge_candidate_set_sort();
+
+ struct lpcc_purge_candidate *candidates[100];
+ int i, j, n_detach, n_discard;
+
+ n_detach = stats.s_start_usage - opt.o_low_usage;
+ n_discard = opt.o_low_usage;
+
+ if (n_detach < 0) {
+ n_detach = 0;
+ }
+
+ llapi_printf(LLAPI_MSG_DEBUG,
+ "copy out %d elements from the head\n", n_detach);
+ for (i = 0; i < n_detach; i++) {
+ candidates[i] = candidate_set.cs_arr[i];
+ }
+
+ llapi_printf(LLAPI_MSG_DEBUG,
+ "shift pool and remove %d elements from the tail\n", n_discard);
+ memmove(&candidate_set.cs_arr[0], &candidate_set.cs_arr[n_detach],
+ (candidate_set.cs_count - n_detach - n_discard) * sizeof(candidate_set.cs_arr[0]));
+
+ for (j = candidate_set.cs_count - n_discard; j < candidate_set.cs_count; j++) {
+ lpcc_purge_candidate_destroy(candidate_set.cs_arr[j]);
+ }
+ candidate_set.cs_count = candidate_set.cs_count - n_detach - n_discard;
+
+ pthread_mutex_unlock(&candidate_set.cs_lock);
+
+ // Detach files from candidates
+ llapi_printf(LLAPI_MSG_DEBUG, "detach files...\n");
+ for (i = 0; i < n_detach; i++) {
+ rc = lpcc_purge_detach_candidate(opt.o_mount, candidates[i]);
+ lpcc_purge_candidate_destroy(candidates[i]);
+ }
+
+ rc = 0;
+out:
+ free(path);
+ lpcc_purge_candidate_destroy(candidate);
+
+ return rc;
+}
+
static void lpcc_purge_scan(void)
{
+ int rc;
+ struct lipe_policy policy;
+ struct scan_result result;
+ bool ldd_err;
+
+ llapi_printf(LLAPI_MSG_INFO, "do scanning...\n");
+ stats.s_start_usage = lpcc_purge_get_fs_usage(opt.o_cache);
+
+ lipe_policy_init(&policy);
+ policy.lp_attr_bits = LIPE_OBJECT_ATTR_ATTR;
+ memset(&result, 0x00, sizeof(result));
+
+ rc = lipe_scan(instance, &policy, &result, NULL,
+ NULL, opt.o_scan_threads, "lpcc_purge",
+ NULL, true, false, &ldd_err);
+ if (rc < 0) {
+ llapi_error(LLAPI_MSG_ERROR, rc,
+ "failed to run lipe_scan");
+ }
+
+ llapi_printf(LLAPI_MSG_INFO, "scanning end\n");
}
static void lpcc_purge_free_space(void)
{
+ int i;
+
+ llapi_printf(LLAPI_MSG_INFO, "free space...\n");
+
+ lpcc_purge_candidate_set_sort();
+
+ for (i = 0; i < candidate_set.cs_count; i++) {
+ double usage = lpcc_purge_get_fs_usage(opt.o_cache);
+ if (usage >= opt.o_low_usage || opt.o_dry_run)
+ lpcc_purge_detach_candidate(opt.o_mount, candidate_set.cs_arr[i]);
+ else
+ break;
+ }
+
+ fflush(stdout);
+ lpcc_purge_candidate_set_clear();
}
int main(int argc, char *argv[])
signal(SIGTERM, &lpcc_purge_sigint_handler);
lpcc_purge_parse_opts(argc, argv);
-
lpcc_purge_verify_opts();
-
lpcc_purge_lock_pidfile();
- while(!exit_flag) {
+ lpcc_purge_candidate_set_init(opt.o_candidate_num);
+
+ instance = lipe_instance_alloc(0, opt.o_cache);
+ instance->li_callback = lpcc_purge_scan_callback;
+ while(1) {
lpcc_purge_wait_for_scan();
lpcc_purge_scan();
lpcc_purge_free_space();
+
+ if (opt.o_dry_run) {
+ break;
+ }
}
- return rc == 0 ? 0 : 255;
+ lipe_instance_free(instance);
+ lpcc_purge_candidate_set_destroy();
+
+ return rc == 0 ? 0 : EXIT_FAILURE;
}