do { typeof(a) __swap_tmp = (a); (a) = (b); (b) = __swap_tmp; } while (0)
#endif
+#define str_empty(s) (!*(s))
+
#define USEC_PER_MSEC 1000
#define USEC_PER_SEC 1000000
#define DEF_POOL_REFRESH_INTV (10 * 60)
/* --src-free, deprecated */
/* --tgt-free, deprecated */
"\t--resync-all-stales=0|1 (default: %u)\n"
- "\t--slow-pool=POOL (default '%s')\n"
+ "\t--slow-pool=POOL[:COMPRESS[:LVL]] (default '%s')\n"
"\t--slow-pool-max-used=MAX stop mirroring to POOL when %% used reaches MAX (default %d)\n"
"\t--ssh-exec-timeout=SECS, ssh connection timeout for remote exec command (default: %d)\n"
"\t--ssh-log-verbosity=V, set SSH_OPTIONS_LOG_VERBOSITY to V (default: 0)\n"
bool rag_opts_checked;
bool rag_has_bwlimit;
bool rag_has_stats;
- int rag_check_in_progress;
+ bool rag_check_in_progress;
struct lipe_list_head rag_list;
struct lipe_list_head rag_ssh_list;
pthread_mutex_t rag_ssh_lock;
int rj_check_job;
int rj_stripes;
int rj_stripe_size;
- char *rj_pool;
+ struct pool_list *rj_pool;
char *rj_mirror_opts;
void (*rj_callback)(struct resync_job *, void *, int rc);
void *rj_callback_data;
static void lamigo_show_progress(void);
struct pool_list *fast_pools; /* fast pools */
-struct pool_list *slow_pools; /* slow pool */
+struct pool_list *slow_pools; /* slow pool. Currently the only slow pool */
static void *lamigo_refresh_statfs_thread(void *arg);
static inline bool are_agents_busy(void)
fprintf(f,
"\"pool\":\"%s\",\n"
+ "\"compression\":\"%s\",\n"
"\"osts\":%d,\n"
"\"used_kb\":%llu,\n"
"\"total_kb\":%llu,\n"
"\"open\":%d\n",
- pl->pl_pool, pl->pl_ostnr,
+ pl->pl_pool,
+ str_empty(pl->pl_compression) ?
+ "none" : pl->pl_compression+3, /* "-Z type:level" */
+ pl->pl_ostnr,
pl->pl_used_kb, pl->pl_total_kb, (int)pl->pl_is_open);
fprintf(f, "}\n");
}
fprintf(f, "%s%s", i ? "," : "", pl->pl_pool);
fprintf(f, "\n"
" slow_pool: %s\n"
+ " slow_pool_compression: %s\n"
" min_age: %u\n"
" max_cache: %lu\n"
" rescan: %u\n"
" include_dom: %d\n"
" resync_all_stales: %u\n"
" ssh_exec_to: %ld\n",
- opt.o_slow_pool, opt.o_min_age, opt.o_cache_size,
+ slow_pools->pl_pool,
+ str_empty(slow_pools->pl_compression) ?
+ "none" : slow_pools->pl_compression+3, /* "-Z type:level" */
+ opt.o_min_age, opt.o_cache_size,
opt.o_rescan, opt.o_num_threads, opt.o_pool_refresh,
opt.o_progress_interval, opt.o_alr_periods,
opt.o_alr_period_time, opt.o_alr_warmup_k,
" osts: %d\n"
" used_kb: %llu\n"
" total_kb: %llu\n"
- " open: %d\n", pl->pl_pool, pl->pl_ostnr,
+ " open: %d\n",
+ pl->pl_pool, pl->pl_ostnr,
pl->pl_used_kb, pl->pl_total_kb, (int)pl->pl_is_open);
for (pl = slow_pools; pl != NULL; pl = pl->pl_next, i++)
" osts: %d\n"
" used_kb: %llu\n"
" total_kb: %llu\n"
- " open: %d\n", pl->pl_pool, pl->pl_ostnr,
+ " open: %d\n",
+ pl->pl_pool, pl->pl_ostnr,
pl->pl_used_kb, pl->pl_total_kb, (int)pl->pl_is_open);
fprintf(f, "stats:\n"
goto out;
}
- if ((strstr(errbuf, "stats-interval") != NULL) ||
- (strstr(outbuf, "stats-interval") != NULL)) {
- rc = snprintf(agent->rag_extra_options,
- sizeof(agent->rag_extra_options) - 1,
- "--stats --stats-interval=%d",
- opt.o_mirror_stats_sec);
+ if (strstr(errbuf, "stats-interval") != NULL ||
+ strstr(outbuf, "stats-interval") != NULL) {
+ snprintf(agent->rag_extra_options,
+ sizeof(agent->rag_extra_options),
+ "--stats --stats-interval=%d",
+ opt.o_mirror_stats_sec);
agent->rag_has_stats = true;
}
- if ((strstr(errbuf, "bandwidth") != NULL) ||
- (strstr(outbuf, "bandwidth") != NULL))
- agent->rag_has_bwlimit = true;
+ agent->rag_has_bwlimit = strstr(errbuf, "bandwidth") != NULL ||
+ strstr(outbuf, "bandwidth") != NULL;
agent->rag_opts_checked = true;
{
struct resync_job *rj = (struct resync_job *)args;
struct resync_agent *agent = rj->rj_agent;
+ struct pool_list *pl = rj->rj_pool;
char errbuf[4096];
int resync = rj->rj_resync;
char cmd[PATH_MAX * 2];
if (rj->rj_setprefer) {
snprintf(cmd, sizeof(cmd),
- "lfs setstripe --comp-set --comp-flags=prefer --pool='%s' "
- "'%s/.lustre/fid/"DFID"'", rj->rj_pool,
- agent->rag_mountpoint,
- PFID(&rj->rj_fid));
- LX_DEBUG("set prefer on "DFID"\n",
+ "lfs setstripe --comp-set --comp-flags=prefer "
+ "--pool='%s' '%s/.lustre/fid/"DFID"'",
+ pl->pl_pool, agent->rag_mountpoint,
PFID(&rj->rj_fid));
+ LX_DEBUG("set prefer on "DFID"\n", PFID(&rj->rj_fid));
} else if (resync == AMIGO_RESYNC_EXTEND) {
int i;
i = snprintf(cmd, sizeof(cmd),
"%s --pool='%s'",
- opt.o_mirror_cmd, rj->rj_pool);
+ opt.o_mirror_cmd, pl->pl_pool);
if (rj->rj_stripes > 0)
i += snprintf(cmd + i, sizeof(cmd) - i,
" --stripe-count=%d", rj->rj_stripes);
if (rj->rj_mirror_opts)
i += snprintf(cmd + i, sizeof(cmd) - i,
" --flags='%s'", rj->rj_mirror_opts);
- i += snprintf(cmd + i, sizeof(cmd) - i,
- " %s '%s/.lustre/fid/"DFID"'",
- agent->rag_extra_options,
- agent->rag_mountpoint,
- PFID(&rj->rj_fid));
+ snprintf(cmd + i, sizeof(cmd) - i,
+ " %s %s '%s/.lustre/fid/"DFID"'",
+ agent->rag_extra_options, pl->pl_compression,
+ agent->rag_mountpoint, PFID(&rj->rj_fid));
} else if (resync == AMIGO_RESYNC_RESYNC) {
snprintf(cmd, sizeof(cmd),
"%s %s '%s/.lustre/fid/"DFID"'",
rj->rj_start = time(NULL);
rj->rj_pool = NULL;
rj->rj_mirror_opts = NULL;
- a->rag_check_in_progress = 1;
+ a->rag_check_in_progress = true;
lipe_list_add_tail(&rj->rj_list, &lamigo_job_list);
}
}
srj->rj_fid = rj->rj_fid;
srj->rj_setprefer = 1;
/* XXX: few src pools? */
- srj->rj_pool = fast_pools->pl_pool;
+ srj->rj_pool = fast_pools;
lipe_list_add_tail(&srj->rj_list, &lamigo_failed_job_list);
lamigo_jobs_delayed++;
rj->rj_stripe_size = mo.mo_stripe_size;
rj->rj_index = f->fr_index;
rj->rj_resync = resync;
- rj->rj_pool = opt.o_slow_pool;
+ rj->rj_pool = slow_pools;
if (mo.mo_need_prefer && mo.mo_src_pool) {
rj->rj_callback_data = mo.mo_src_pool;
rj->rj_callback = lamigo_schedule_setprefer;
rj->rj_try++;
if (rj->rj_check_job) {
- rj->rj_agent->rag_check_in_progress = 0;
+ rj->rj_agent->rag_check_in_progress = false;
if (retval == 0) {
/* the agent is back */
if (rj->rj_agent->rag_bad) {
a->rag_jobs = 0;
a->rag_bad = true;
a->rag_opts_checked = false;
- a->rag_check_in_progress = 0;
+ a->rag_check_in_progress = false;
pthread_mutex_init(&a->rag_ssh_lock, NULL);
pthread_cond_init(&a->rag_ssh_cond, NULL);
LIPE_INIT_LIST_HEAD(&a->rag_ssh_list);
struct pool_list *pl;
pl = xcalloc(sizeof(*pl), 1);
- pl->pl_pool = xstrdup(pool);
+ snprintf(pl->pl_pool, sizeof(pl->pl_pool), "%s", pool);
pl->pl_ostnr = 0;
pl->pl_osts = NULL;
pthread_rwlock_init(&pl->pl_lock, NULL);
return pl;
}
-static void lamigo_add_fast_pool(const char *pool)
+static void lamigo_add_pool(struct pool_list **pool_list, const char *pool,
+ char *compression)
{
- struct pool_list *pl;
+ struct pool_list *pl = *pool_list;
/* check for duplicates */
- pl = fast_pools;
while (pl) {
if (strcmp(pl->pl_pool, pool) == 0)
return;
if (!pl)
return;
- pl->pl_is_fast = true; /* Faster than slow, anyway. */
- pl->pl_next = fast_pools;
- fast_pools = pl;
+ pl->pl_is_fast = pool_list == &fast_pools;
+ snprintf(pl->pl_compression, sizeof(pl->pl_compression), "%s", compression);
+ pl->pl_next = *pool_list;
+ *pool_list = pl;
+}
+
+static void lamigo_add_fast_pool(const char *pool)
+{
+ /* No compression in fast pool */
+ lamigo_add_pool(&fast_pools, pool, "");
+}
+
+static void lamigo_add_slow_pool(char *pool, char *compression, char *level)
+{
+ char compr[COMPRESS_STR_SIZE] = {'\0'};
+
+ if (compression != NULL) {
+ int rc;
+
+ if (level != NULL)
+ rc = snprintf(compr, sizeof(compr),
+ "-Z %s:%s", compression, level);
+ else
+ rc = snprintf(compr, sizeof(compr), "-Z %s", compression);
+ if (rc <= 0 || rc >= sizeof(compr))
+ LX_FATAL("invalid slow pool compression\n");
+ }
+ lamigo_add_pool(&slow_pools, pool, compr);
}
static void lamigo_refresh_osts_from_pool(struct pool_list *pl)
char *endptr;
switch (c) {
- char *host, *mnt, *jobs;
+ char *host, *mnt, *jobs, *compression, *pool, *level;
case LAMIGO_OPT_IGNORE_READS:
opt.o_alr_ignore_reads = 1;
break;
LX_WARN("options '-t' and '--tgt' are deprecated, please use --slow-pool instead\n");
/* Fall through. */
case LAMIGO_OPT_SLOW_POOL:
- opt.o_slow_pool = strdup(optarg);
+ compression = NULL;
+ level = NULL;
+
+ if (optarg != NULL) {
+ if (strchr(optarg, ':') == NULL) {
+ pool = optarg;
+ } else {
+ pool = strsep(&optarg, ":");
+ compression = strsep(&optarg, ":");
+ level = strsep(&optarg, ":");
+ }
+ } else {
+ pool = DEF_SLOW_POOL;
+ LX_WARN("slow pool is not defined, using '%s'\n",
+ DEF_SLOW_POOL);
+ }
+ lamigo_add_slow_pool(pool, compression, level);
break;
case 'u':
opt.o_changelog_user = xstrdup(optarg);
f = fopen(name, "r");
if (!f)
- LX_FATAL("cannot open config file '%s': %s\n", name, strerror(errno));
+ LX_FATAL("cannot open config file '%s': %s\n",
+ name, strerror(errno));
while (!feof(f)) {
struct option *opt;
LX_WARN("fast pools aren't defined, using '%s'\n", DEF_FAST_POOL);
}
- if (opt.o_slow_pool == NULL) {
- opt.o_slow_pool = DEF_SLOW_POOL;
- LX_WARN("slow pool is not defined, using %s\n", opt.o_slow_pool);
+ if (slow_pools == NULL) {
+ lamigo_add_slow_pool(DEF_SLOW_POOL, NULL, NULL);
+ LX_WARN("slow pool is not defined, using %s\n", DEF_SLOW_POOL);
}
- if (lamigo_lookup_fast_pool(opt.o_slow_pool))
+ if (lamigo_lookup_fast_pool(slow_pools->pl_pool))
LX_FATAL("slow pool '%s' cannot also be fast pool\n",
- opt.o_slow_pool);
+ slow_pools->pl_pool);
if (lipe_list_empty(&lamigo_agent_list))
LX_FATAL("no agents configured\n");
- LX_DEBUG("slow pool '%s'\n", opt.o_slow_pool);
- slow_pools = lamigo_alloc_pool(opt.o_slow_pool);
+ LX_DEBUG("slow pool '%s'\n", slow_pools->pl_pool);
if (!opt.o_dump_file) {
snprintf(buf, sizeof(buf), LAMIGO_DUMPFILE, opt.o_mdtname);
rj->rj_stripes = mo->mo_stripes;
rj->rj_stripe_size = mo->mo_stripe_size;
rj->rj_resync = resync;
- rj->rj_pool = slow_pools->pl_pool;
+ rj->rj_pool = slow_pools;
rj->rj_callback = lamigo_alr_mirror_cb;
return lamigo_submit_job(rj);
goto out;
/* Quit on error since rules will only match the files with
- * valid attrs */
+ * valid attrs */
rule.lr_attr_bits = lamigo_rule_attrs;
rc = lipe_rule_read_attrs(NULL /* policy */,
object,
rc = lamigo_read_param("mntdev", mdt_mntdev, sizeof(buf));
if (rc) {
- LX_ERROR("cannot read 'mntdev' param for device '%s': %s\n", opt.o_mdtname, strerror(-rc));
+ LX_ERROR("cannot read 'mntdev' param for device '%s': %s\n",
+ opt.o_mdtname, strerror(-rc));
return rc;
}
strcpy(instance.li_device, mdt_mntdev);
rj->rj_resync = sync;
rj->rj_check_job = 0;
rj->rj_pid = 0;
- rj->rj_pool = pl->pl_pool;
+ rj->rj_pool = pl;
/* only fast pool mirrors should be marked "prefer" */
rj->rj_mirror_opts = pl->pl_is_fast ? "prefer" : NULL;
rj->rj_callback = lamigo_alr_mirror_cb;
LAMIGO_EXTRA=${LAMIGO_EXTRA:-""}
LAMIGO_CLIENTS=${CLIENTS:-"$HOSTNAME"}
LAMIGO_CLIENTS=${LAMIGO_CLIENTS//,/ }
+# pool compression "--slow-pool=name:lz4:level
+LAMIGO_COMPRESSION=${LAMIGO_COMPRESSION:-""}
LAMIGO_STATS_MISSING=${LAMIGO_STATS_MISSING:-""}
declare -a LAMIGO_MDT
cmd+=${usr:+" -u $usr"}
cmd+=${LAMIGO_SRC:+" --fast-pool=$LAMIGO_SRC"}
cmd+=${LAMIGO_TGT:+" --slow-pool=$LAMIGO_TGT"}
+ cmd+=${LAMIGO_COMPRESSION:+":$LAMIGO_COMPRESSION"}
cmd+=${LAMIGO_AGE:+" --min-age=$LAMIGO_AGE"}
cmd+=${LAMIGO_THREAD_NUM:+" --thread-number=$LAMIGO_THREAD_NUM"}
cmd+=${LAMIGO_CACHE:+" --max-cache=$LAMIGO_CACHE"}
echo user=\\\"$usr\\\" >> $cfg_file;
[[ -z \\\"$LAMIGO_SRC\\\" ]] ||
echo src=\\\"$LAMIGO_SRC\\\" >> $cfg_file;
- [[ -z \\\"$LAMIGO_TGT\\\" ]] ||
- echo tgt=\\\"$LAMIGO_TGT\\\" >> $cfg_file;
+ if [[ -z \\\"$LAMIGO_COMPRESSION\\\" ]]; then
+ [[ -z \\\"$LAMIGO_TGT\\\" ]] ||
+ echo slow-pool=\\\"$LAMIGO_TGT\\\" >> $cfg_file;
+ else
+ [[ -z \\\"$LAMIGO_TGT\\\" ]] ||
+ echo slow-pool=\\\"$LAMIGO_TGT:$LAMIGO_COMPRESSION\\\" \
+ >> $cfg_file;
+ fi
[[ -z \\\"$LAMIGO_AGE\\\" ]] ||
echo min-age=\\\"$LAMIGO_AGE\\\" >> $cfg_file;
[[ -z \\\"$LAMIGO_CACHE\\\" ]] ||
local i=${1:-0}
local mdt=${LAMIGO_MDT[i]}
local usr=${LAMIGO_USR[i]}
+ local compression=${LAMIGO_COMPRESSION:-"none"}
verify_one_lamigo_param $i chlg_user "$usr"
verify_one_lamigo_param $i mdtname "$mdt"
done
}
+verify_file_compress() {
+ local file="$1"
+ local compr="$2"
+ local lvl="$3"
+ local ids
+ local id
+
+ verify_mirror_count $file 2
+
+ ids=($($LFS getstripe $file | awk '/lcme_id/{print $2}' | tr '\n' ' '))
+ for id in "${ids[@]}"; do
+ # last mirror should be on target pool, but not marked prefer
+ if [[ "$id" = "${ids[${#ids[*]}-1]}" ]]; then
+ verify_comp_attr pool $file $id $LAMIGO_TGT
+ verify_comp_attr lcme_flags $file $id init,compress
+ verify_comp_attr compr-type $file $id "$compr"
+ verify_comp_attr compr-level $file $id "$lvl"
+ else
+ verify_comp_attr pool $file $id $LAMIGO_SRC
+ verify_comp_attr lcme_flags $file $id init
+ fi
+ done
+}
+
verify_file_size() {
local file="$1"
local expected="$2"
}
run_test 77 "lamigo: verify --resync-all-stales option"
+test_80() {
+ $LFS mirror extend 2>&1 | grep -q compress ||
+ skip "Client $client requires 'lfs mirror extend --compress' support"
+
+ local client=$HOSTNAME
+ local min_age=5
+ local compression="lz4:1"
+ local file=${DIR}/${tfile}
+ declare -a compr_arr
+
+ init_hot_pools_env
+
+ enable_compression
+ stack_trap "rm -fr $file; disable_compression"
+
+ LAMIGO_AGT_NODES="" \
+ LAMIGO_CLIENTS=$client \
+ LAMIGO_COMPRESSION="$compression" \
+ LAMIGO_AGE="${min_age}" \
+ start_lamigo_cmd
+ check_lamigo_is_started || error "failed to start lamigo"
+ stack_trap stop_lamigo_cmd
+
+ verify_one_lamigo_param 0 slow_pool_compression "$compression"
+
+ $LFS setstripe -p $LAMIGO_SRC $file || error "$LFS setstripe $file failed"
+ dd if=/dev/zero of=$file bs=32KB count=1 oflag=direct ||
+ error "cannot create '$file'"
+ wait_file_mirror $file 2 $((min_age * 2))
+
+ compr_arr=(${compression//:/ })
+
+ verify_file_compress $file ${compr_arr[0]} ${compr_arr[1]}
+}
+run_test 80 "test slow pool compression option"
+
complete_test $SECONDS
check_and_cleanup_lustre
exit_status