Whamcloud - gitweb
EX-2975 lipe: implement approximate LRU in lpcc_purge
authorLei Feng <flei@whamcloud.com>
Wed, 7 Apr 2021 08:08:55 +0000 (16:08 +0800)
committerAndreas Dilger <adilger@whamcloud.com>
Thu, 22 Jul 2021 17:35:00 +0000 (17:35 +0000)
LRU is actually FIFO based on the atime of cache file. But we
can only use limited memory to sort the atime of all files.
So we implement an approximate LRU in lpcc_purge.

Change-Id: Ic636d97b08ee6424f54a1088dfe07359df1f8f79
Signed-off-by: Lei Feng <flei@whamcloud.com>
Test-Parameters: trivial
Reviewed-on: https://review.whamcloud.com/43224
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: John L. Hammond <jhammond@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lipe/src/lpcc_purge.c

index e0bff19..47e0c7b 100644 (file)
@@ -2,8 +2,11 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <getopt.h>
+#include <libgen.h>
 #include <linux/lustre/lustre_idl.h>
 #include <lustre/lustreapi.h>
+#include <lustre/lustre_user.h>
+#include <pthread.h>
 #include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
 
 #include "debug.h"
+#include "lipe_object_attrs.h"
+#include "policy.h"
 
 
 #define DEF_HIGH_USAGE                 90
 #define DEF_LOW_USAGE                  75
 #define DEF_INTERVAL                   5
 #define DEF_SCAN_THREADS               1
+#define DEF_CANDIDATE_NUM              2000
+#define MAX_CANDIDATE_NUM              100000
+#define MAX_ATIME_FID_LEN              128
 
+#define OPT_DRY_RUN                    1
+#define OPT_CANDIDATE_NUM              2
+#define OPT_FORCE_CLEAR                        3
 #define OPT_LOG_LEVEL                  4
 
 struct lpcc_purge_options {
@@ -34,6 +45,10 @@ struct lpcc_purge_options {
 
        int o_interval;
        int o_scan_threads;
+
+       int o_candidate_num;
+       bool o_dry_run;
+       bool o_force_clear;
 };
 static struct lpcc_purge_options opt = {
        .o_rwid                 = -1,
@@ -41,16 +56,124 @@ static struct lpcc_purge_options opt = {
        .o_low_usage            = DEF_LOW_USAGE,
        .o_interval             = DEF_INTERVAL,
        .o_scan_threads         = DEF_SCAN_THREADS,
+       .o_candidate_num        = DEF_CANDIDATE_NUM,
+       .o_dry_run              = false,
+       .o_force_clear          = false,
 };
 
 bool exit_flag = false;
 struct lpcc_purge_stats {
        double s_start_usage;
+       pthread_mutex_t s_lock;
 };
+
 static struct lpcc_purge_stats stats = {
        .s_start_usage  = -1,
+       .s_lock         = PTHREAD_MUTEX_INITIALIZER,
+};
+
+static struct lipe_instance *instance = NULL;
+
+struct lpcc_purge_candidate {
+       uint64_t c_atime_ms;
+       struct lu_fid c_fid;
+       char c_path[0];
 };
 
+struct lpcc_purge_candidate_set {
+       struct lpcc_purge_candidate ** cs_arr;
+       int cs_capacity;        /* the total capacity of array */
+       int cs_count;           /* the number of valid elements in array */
+       pthread_mutex_t cs_lock;
+};
+static struct lpcc_purge_candidate_set candidate_set;
+
+struct lpcc_purge_candidate *lpcc_purge_candidate_new(
+       uint64_t atime_ms, struct lu_fid fid, const char *path)
+{
+       struct lpcc_purge_candidate *candidate =
+               (struct lpcc_purge_candidate *)malloc(
+               sizeof(struct lpcc_purge_candidate) + strlen(path) + 1);
+
+       if (candidate == NULL) {
+               llapi_error(LLAPI_MSG_FATAL, errno,
+                       "cannot allocate memory for candidate!");
+               exit(1);
+       }
+
+       candidate->c_atime_ms = atime_ms;
+       candidate->c_fid = fid;
+       strcpy(candidate->c_path, path);
+
+       return candidate;
+}
+
+void lpcc_purge_candidate_destroy(struct lpcc_purge_candidate *candidate)
+{
+       free(candidate);
+}
+
+static int _compare_candidate(const void *p1, const void *p2)
+{
+       const struct lpcc_purge_candidate *c1 = *(const struct lpcc_purge_candidate **)(p1);
+       const struct lpcc_purge_candidate *c2 = *(const struct lpcc_purge_candidate **)(p2);
+
+       if (c1->c_atime_ms < c2->c_atime_ms)
+               return -1;
+       if (c1->c_atime_ms > c2->c_atime_ms)
+               return 1;
+       return 0;
+}
+
+static void lpcc_purge_candidate_set_init(int capacity)
+{
+       candidate_set.cs_arr = calloc(capacity, sizeof(candidate_set.cs_arr[0]));
+       if (candidate_set.cs_arr == NULL) {
+               llapi_error(LLAPI_MSG_FATAL, errno,
+                       "cannot allocate memory for candidate set!");
+               exit(1);
+       }
+       candidate_set.cs_capacity = capacity;
+       candidate_set.cs_count = 0;
+       pthread_mutex_init(&candidate_set.cs_lock, NULL);
+}
+
+static void lpcc_purge_candidate_set_destroy(void)
+{
+       int i;
+       for (i = 0; i < candidate_set.cs_count; i++)
+               lpcc_purge_candidate_destroy(candidate_set.cs_arr[i]);
+
+       free(candidate_set.cs_arr);
+}
+
+static void lpcc_purge_candidate_set_clear(void)
+{
+       int i;
+       for (i = 0; i < candidate_set.cs_count; i++)
+               lpcc_purge_candidate_destroy(candidate_set.cs_arr[i]);
+
+       candidate_set.cs_count = 0;
+}
+
+static void lpcc_purge_candidate_set_append(
+       struct lpcc_purge_candidate *candidate)
+{
+       if (candidate_set.cs_count >= candidate_set.cs_capacity) {
+               llapi_error(LLAPI_MSG_FATAL, EFAULT,
+                       "Out of boundary of array");
+               exit(1);
+       }
+
+       candidate_set.cs_arr[candidate_set.cs_count++] = candidate;
+}
+
+void lpcc_purge_candidate_set_sort(void)
+{
+       qsort(candidate_set.cs_arr, candidate_set.cs_count,
+               sizeof(candidate_set.cs_arr[0]), &_compare_candidate);
+}
+
 static void lpcc_purge_null_handler(int signal)
 {
 }
@@ -58,54 +181,68 @@ static void lpcc_purge_null_handler(int signal)
 static void lpcc_purge_sigint_handler(int signal)
 {
        psignal(signal, "exiting");
-
-       exit_flag = true;
+       _exit(0);
 }
 
 static void usage(void)
 {
        printf("Usage: %s [options]\n"
                "\t-b, --debug, enable debugging output\n"
-               "\t    --log_level={debug|info|normal|warn|error|fatal|off}, set log level (default: info)\n"
+               "\t    --log-level={debug|info|normal|warn|error|fatal|off}, set log level (default: info)\n"
                "\t-f, --config-file=FILE\n"
                "\t-C, --cache=DIR root directory of PCC\n"
                "\t-M, --mount=DIR local Lustre client's mountpoint\n"
                "\t-A, --rwid=NUM the rwid of PCC\n"
-               "\t-H, --high_usage=NUM %% of space or inode to start purging (default: %u)\n"
-               "\t-L, --low_usage=NUM %% of space or inode to stop purging (default: %u)\n"
+               "\t-H, --high-usage=NUM %% of space or inode to start purging (default: %u)\n"
+               "\t-L, --low-usage=NUM %% of space or inode to stop purging (default: %u)\n"
                "\t-i, --interval=NUM, seconds to next check (default: %u)\n"
-               "\t-t, --scan_threads=NUM scanning threads (default: %u)\n"
+               "\t-t, --scan-threads=NUM scanning threads (default: %u)\n"
+               "\t    --candidate-num=NUM, candidate number of approximate LRU (default: %d)\n"
+               "\t    --dry-run, scan once but don't detach file really\n"
                "\t-h, --help, print this help message\n",
 
                program_invocation_short_name,
                DEF_HIGH_USAGE,
                DEF_LOW_USAGE,
                DEF_INTERVAL,
-               DEF_SCAN_THREADS
+               DEF_SCAN_THREADS,
+               DEF_CANDIDATE_NUM
        );
 }
 
 static struct option long_options[] = {
        { "debug", no_argument, NULL, 'b'},
        { "config-file", required_argument, NULL, 'f'},
-       { "log_level", required_argument, NULL, OPT_LOG_LEVEL},
+       { "log-level", required_argument, NULL, OPT_LOG_LEVEL},
        { "cache", required_argument, NULL, 'C'},
        { "mount", required_argument, NULL, 'M'},
        { "rwid", required_argument, NULL, 'A'},
-       { "high_usage", required_argument, NULL, 'H'},
-       { "low_usage", required_argument, NULL, 'L'},
+       { "high-usage", required_argument, NULL, 'H'},
+       { "low-usage", required_argument, NULL, 'L'},
        { "interval", required_argument, NULL, 'i'},
-       { "scan_threads", required_argument, NULL, 't'},
+       { "scan-threads", required_argument, NULL, 't'},
+       { "dry-run", no_argument, NULL, OPT_DRY_RUN},
+       { "candidate-num", required_argument, NULL, OPT_CANDIDATE_NUM},
+       { "force-clear", no_argument, NULL, OPT_FORCE_CLEAR},
        { "help", no_argument, NULL, 'h' },
        { NULL }
 };
 
 static struct option *lpcc_purge_keyword_lookup(const char *keyword)
 {
-       int i = 0;
+       int i;
+       char keyword2[PATH_MAX];
+
+       for (i = 0; i <= strlen(keyword) && i < PATH_MAX; i++) {
+               if (keyword[i] == '_')
+                       keyword2[i] = '-';
+               else
+                       keyword2[i] = keyword[i];
+       }
 
+       i = 0;
        while (long_options[i].name) {
-               if (strcmp(keyword, long_options[i].name) == 0)
+               if (strcmp(keyword2, long_options[i].name) == 0)
                        return long_options + i;
                i++;
        }
@@ -260,7 +397,7 @@ static void lpcc_purge_process_opt(int c, char *optarg)
                break;
        case OPT_LOG_LEVEL:
                log_level = parse_log_level(optarg);
-               llapi_printf(LLAPI_MSG_INFO, "Set log level: %s\n",
+               llapi_printf(LLAPI_MSG_INFO, "set log level: %s\n",
                        log_level_strs[log_level]);
                llapi_msg_set_level(log_level);
 
@@ -323,6 +460,22 @@ static void lpcc_purge_process_opt(int c, char *optarg)
                }
                opt.o_scan_threads = value;
                break;
+       case OPT_DRY_RUN:
+               opt.o_dry_run = true;
+               break;
+       case OPT_CANDIDATE_NUM:
+               value = strtol(optarg, &endptr, 10);
+               if (*endptr != '\0' || value < 100 || value > MAX_CANDIDATE_NUM) {
+                       llapi_error(LLAPI_MSG_FATAL, -EINVAL,
+                                   "invalid candidate number: '%s'",
+                                   optarg);
+                       exit(1);
+               }
+               opt.o_candidate_num = value;
+               break;
+       case OPT_FORCE_CLEAR:
+               opt.o_force_clear = true;
+               break;
        default:
                llapi_error(LLAPI_MSG_FATAL, -EINVAL,
                                    "invalid argument: '%s'",
@@ -454,6 +607,7 @@ static void lpcc_purge_lock_pidfile(void)
 static double lpcc_purge_get_fs_usage(const char *fs)
 {
        int rc;
+       double usage, i_usage;
        struct statfs statfs_buf;
 
        rc = statfs(opt.o_cache, &statfs_buf);
@@ -462,14 +616,18 @@ static double lpcc_purge_get_fs_usage(const char *fs)
                exit(1);
        }
 
-       return 100.0 * (statfs_buf.f_blocks - statfs_buf.f_bfree) / statfs_buf.f_blocks;
+       usage = 100.0 * (statfs_buf.f_blocks - statfs_buf.f_bfree) / statfs_buf.f_blocks;
+       i_usage = 100.0 * (statfs_buf.f_files - statfs_buf.f_ffree) / statfs_buf.f_files;
+
+       return (usage > i_usage) ? usage : i_usage;
 }
 
 static void lpcc_purge_wait_for_scan(void)
 {
-       while (!exit_flag) {
+       while (1) {
                double usage = lpcc_purge_get_fs_usage(opt.o_cache);
-               if (usage >= opt.o_high_usage) {
+               llapi_printf(LLAPI_MSG_DEBUG, "usage: %.1f\n", usage);
+               if (usage >= opt.o_high_usage || opt.o_dry_run) {
                        stats.s_start_usage = usage;
                        break;
                } else {
@@ -478,12 +636,205 @@ static void lpcc_purge_wait_for_scan(void)
        }
 }
 
+/**
+ * This is a temporary function to remove the cache file and empty hash dir
+ * recursively up to the cache dir.
+ *
+ * The function is only for testing, it's not safe in production enviroment.
+ */
+static int remove_hash_dir(const char *path)
+{
+       int rc;
+       char dir[PATH_MAX], tmp[PATH_MAX];
+
+       unlink(path);
+
+       strcpy(tmp, path);
+       strcpy(dir, dirname(tmp));
+
+       while (strlen(dir) > strlen(opt.o_cache)) {
+               rc = rmdir(dir);
+               if (rc) {
+                       if (errno == ENOTEMPTY || errno == EBUSY) {
+                               rc = 0;
+                       }
+                       else {
+                               rc = -errno;
+                               llapi_printf(LLAPI_MSG_WARN,
+                                       "cannot remove hash dir: %s. errno = %d\n",
+                                       dir, -rc);
+                       }
+                       break;
+               }
+               else {
+                       strcpy(tmp, dir);
+                       strcpy(dir, dirname(tmp));
+               }
+       }
+
+       return rc;
+}
+
+static int lpcc_purge_detach_candidate(const char *mnt,
+       const struct lpcc_purge_candidate *candidate)
+{
+       int rc;
+
+       llapi_printf(LLAPI_MSG_DEBUG, "detach fid: "DFID"\n", PFID(&candidate->c_fid));
+
+       if (opt.o_dry_run) {
+               return -1;
+       }
+
+       rc = llapi_pcc_detach_fid(mnt, &candidate->c_fid, PCC_DETACH_FL_UNCACHE);
+       if (rc) {
+               llapi_error(LLAPI_MSG_WARN, -rc,  "cannot detach fid: "DFID,
+                       PFID(&candidate->c_fid));
+               return rc;
+       }
+
+       if (opt.o_force_clear)
+               remove_hash_dir(candidate->c_path);
+
+       return 0;
+}
+
+static int lpcc_purge_scan_callback(struct lipe_instance *instance,
+                            struct lipe_object *object,
+                            struct lipe_object_attrs *attrs)
+{
+       int rc;
+       struct lu_fid fid;
+       char *path = NULL;
+       struct lpcc_purge_candidate *candidate = NULL;
+
+       if (!S_ISREG(attrs->loa_mode)) {
+               // skip non-regular file
+               return 0;
+       }
+
+       path = strdup(object->u.lo_posix.lop_path);
+       if (path == NULL) {
+               rc = -errno;
+               goto out;
+       }
+
+       const char * name = basename(path);
+       rc = sscanf(name, SFID, RFID(&fid));
+       if (rc != 3)  {
+               // Not an valid fid string, skip it
+               rc = 0;
+               goto out;
+       }
+
+       candidate = lpcc_purge_candidate_new(attrs->loa_atime_ms, fid, object->u.lo_posix.lop_path);
+       if (candidate == NULL) {
+               rc = -ENOMEM;
+               goto out;
+       }
+
+       llapi_printf(LLAPI_MSG_DEBUG, "append fid: "DFID"\n", PFID(&fid));
+       pthread_mutex_lock(&candidate_set.cs_lock);
+       lpcc_purge_candidate_set_append(candidate);
+       candidate = NULL;
+
+       // check whether the pool is full
+       if (candidate_set.cs_count < candidate_set.cs_capacity) {
+               pthread_mutex_unlock(&candidate_set.cs_lock);
+               rc = 0;
+               goto out;
+       }
+
+       // pool is full
+       llapi_printf(LLAPI_MSG_DEBUG, "candidate set is full, sort it first\n");
+       lpcc_purge_candidate_set_sort();
+
+       struct lpcc_purge_candidate *candidates[100];
+       int i, j, n_detach, n_discard;
+
+       n_detach = stats.s_start_usage - opt.o_low_usage;
+       n_discard = opt.o_low_usage;
+
+       if (n_detach < 0) {
+               n_detach = 0;
+       }
+
+       llapi_printf(LLAPI_MSG_DEBUG,
+               "copy out %d elements from the head\n", n_detach);
+       for (i = 0; i < n_detach; i++) {
+               candidates[i] = candidate_set.cs_arr[i];
+       }
+
+       llapi_printf(LLAPI_MSG_DEBUG,
+               "shift pool and remove %d elements from the tail\n", n_discard);
+       memmove(&candidate_set.cs_arr[0], &candidate_set.cs_arr[n_detach],
+               (candidate_set.cs_count - n_detach - n_discard) * sizeof(candidate_set.cs_arr[0]));
+
+       for (j = candidate_set.cs_count - n_discard; j < candidate_set.cs_count; j++) {
+               lpcc_purge_candidate_destroy(candidate_set.cs_arr[j]);
+       }
+       candidate_set.cs_count = candidate_set.cs_count - n_detach - n_discard;
+
+       pthread_mutex_unlock(&candidate_set.cs_lock);
+
+       // Detach files from candidates
+       llapi_printf(LLAPI_MSG_DEBUG, "detach files...\n");
+       for (i = 0; i < n_detach; i++) {
+               rc = lpcc_purge_detach_candidate(opt.o_mount, candidates[i]);
+               lpcc_purge_candidate_destroy(candidates[i]);
+       }
+
+       rc = 0;
+out:
+       free(path);
+       lpcc_purge_candidate_destroy(candidate);
+
+       return rc;
+}
+
 static void lpcc_purge_scan(void)
 {
+       int rc;
+       struct lipe_policy policy;
+       struct scan_result result;
+       bool ldd_err;
+
+       llapi_printf(LLAPI_MSG_INFO, "do scanning...\n");
+       stats.s_start_usage = lpcc_purge_get_fs_usage(opt.o_cache);
+
+       lipe_policy_init(&policy);
+       policy.lp_attr_bits = LIPE_OBJECT_ATTR_ATTR;
+       memset(&result, 0x00, sizeof(result));
+
+       rc = lipe_scan(instance, &policy, &result, NULL,
+                       NULL, opt.o_scan_threads, "lpcc_purge",
+                       NULL, true, false, &ldd_err);
+       if (rc < 0) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                       "failed to run lipe_scan");
+       }
+
+       llapi_printf(LLAPI_MSG_INFO, "scanning end\n");
 }
 
 static void lpcc_purge_free_space(void)
 {
+       int i;
+
+       llapi_printf(LLAPI_MSG_INFO, "free space...\n");
+
+       lpcc_purge_candidate_set_sort();
+
+       for (i = 0; i < candidate_set.cs_count; i++) {
+               double usage = lpcc_purge_get_fs_usage(opt.o_cache);
+               if (usage >= opt.o_low_usage || opt.o_dry_run)
+                       lpcc_purge_detach_candidate(opt.o_mount, candidate_set.cs_arr[i]);
+               else
+                       break;
+       }
+
+       fflush(stdout);
+       lpcc_purge_candidate_set_clear();
 }
 
 int main(int argc, char *argv[])
@@ -498,19 +849,28 @@ int main(int argc, char *argv[])
        signal(SIGTERM, &lpcc_purge_sigint_handler);
 
        lpcc_purge_parse_opts(argc, argv);
-
        lpcc_purge_verify_opts();
-
        lpcc_purge_lock_pidfile();
 
-       while(!exit_flag) {
+       lpcc_purge_candidate_set_init(opt.o_candidate_num);
+
+       instance = lipe_instance_alloc(0, opt.o_cache);
+       instance->li_callback = lpcc_purge_scan_callback;
 
+       while(1) {
                lpcc_purge_wait_for_scan();
 
                lpcc_purge_scan();
 
                lpcc_purge_free_space();
+
+               if (opt.o_dry_run) {
+                       break;
+               }
        }
 
-       return rc == 0 ? 0 : 255;
+       lipe_instance_free(instance);
+       lpcc_purge_candidate_set_destroy();
+
+       return rc == 0 ? 0 : EXIT_FAILURE;
 }