From 2490b3d32f04001e4fa52429e53e73b2f45a625a Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Sat, 22 Jul 2023 12:51:56 +0300 Subject: [PATCH] EX-7948 utils: lamigo to track mirror progress pass --stats to lfs mirror/resync commands and then read lfs's output over ssh channel. this way we can keep ssh channel alive and interrupt replication if it doesn't report progress. the very first time agent is used lamigo checks whether agent's lfs utility supports stats. Signed-off-by: Alex Zhuravlev Change-Id: Iee5b43cb85dae62550d74667b16e00336f1bf52f Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/51744 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Alexandre Ioffe Reviewed-by: Andreas Dilger --- lipe/src/lamigo.c | 312 +++++++++++++++++++++++++++++++++++++++++----- lipe/src/lamigo.h | 1 + lipe/src/lamigo_alr.c | 4 +- lipe/src/lipe_ssh.c | 31 ++--- lipe/src/lipe_ssh.h | 14 +-- lipe/src/lx_log.h | 1 + lustre/tests/hot-pools.sh | 156 +++++++++++++++++++++++ 7 files changed, 452 insertions(+), 67 deletions(-) diff --git a/lipe/src/lamigo.c b/lipe/src/lamigo.c index b82dc2b..90e269f 100644 --- a/lipe/src/lamigo.c +++ b/lipe/src/lamigo.c @@ -63,6 +63,7 @@ #include "list.h" #include "lipe_config.h" #include "lipe_version.h" +#include #ifndef swap # define swap(a, b) \ @@ -93,6 +94,7 @@ #define DEF_FAST_POOL_MAX_USED 30 /* open for migration if % space used is less than */ #define DEF_SLOW_POOL_MAX_USED 90 /* open for migration if % space used is less than */ #define DEF_SSH_EXEC_TO_SECS (10 * 60) /* ssh exec timeout */ +#define DEF_MIRROR_STATS_SEC 3 /* mirror extend/resync stats interval */ #define LAMIGO_USERFILE "/var/lib/lamigo-%s.chlg" #define LAMIGO_DUMPFILE "/var/run/lamigo-%s.stats" @@ -198,11 +200,15 @@ struct resync_agent { int rag_jobs; bool rag_bad; bool rag_is_local; + bool rag_opts_checked; + bool rag_has_bwlimit; + bool rag_has_stats; int rag_check_in_progress; struct lipe_list_head rag_list; struct lipe_list_head rag_ssh_list; pthread_mutex_t rag_ssh_lock; pthread_cond_t rag_ssh_cond; + char rag_extra_options[128]; }; struct resync_job; @@ -224,6 +230,15 @@ struct resync_job { void *rj_callback_data; time_t rj_start; time_t rj_done_timestamp; + /* progress fields as reported by lfs --stats */ + int rj_dont_parse; + int rj_seconds; + int rj_rmbps; + int rj_wmbps; + int rj_copied; + int rj_size; + int rj_pct; + int rj_try; }; enum amigo_resync_type { @@ -267,6 +282,7 @@ struct options opt = { .o_alr_warmup_k = DEF_WARMUP_K, .o_alr_cooldown_k = DEF_COOLDOWN_K, .o_alr_ofd_interval = 0, + .o_mirror_stats_sec = DEF_MIRROR_STATS_SEC, .o_alr_hot_fraction = DEF_HOT_FRACTION, .o_alr_hot_after_idle = DEF_HOT_AFTER_IDLE, .o_include_dom = 0, @@ -304,6 +320,7 @@ struct stats { unsigned long s_skip_insync; unsigned long s_skip_by_source; unsigned long s_extend_by_target; + unsigned long s_job_timedout; struct history s_hist[100]; int s_hist_cur; }; @@ -439,7 +456,7 @@ static int systemf(const char *fmt, ...) } #define JOB_FMT \ - " job%u: { tid: %lu, fid: "DFID", index: %llu, agent: %d, start: %lu, done: %lu, command: %s }\n" + " job%u: { tid: %lu, fid: "DFID", index: %llu, agent: %d, start: %lu, done: %lu, try: %d, progress: %d%%, command: %s }\n" static void lamigo_dump_jobs(FILE *out, struct lipe_list_head *jlist) { @@ -457,7 +474,8 @@ static void lamigo_dump_jobs(FILE *out, struct lipe_list_head *jlist) fprintf(out, JOB_FMT, i++, j->rj_pid, PFID(&j->rj_fid), j->rj_index, j->rj_agent ? j->rj_agent->rag_index : -1, - now - j->rj_start, now - j->rj_done_timestamp, cmd); + now - j->rj_start, now - j->rj_done_timestamp, + j->rj_try, j->rj_pct, cmd); } } @@ -694,7 +712,8 @@ static void lamigo_dump_stats_file(void) " resync-stale: %lu\n" " skip-insync: %lu\n" " skip-by-source: %lu\n" - " extend-by-target: %lu\n", + " extend-by-target: %lu\n" + " job-timedout: %lu\n", stats.s_scan_begin, stats.s_scan_end, stats.s_scan_replicated, @@ -709,7 +728,8 @@ static void lamigo_dump_stats_file(void) stats.s_skip_by_rule, stats.s_extend_by_pool, stats.s_extend_by_objects, stats.s_skip_unknown, stats.s_resync_stale, stats.s_skip_insync, - stats.s_skip_by_source, stats.s_extend_by_target); + stats.s_skip_by_source, stats.s_extend_by_target, + stats.s_job_timedout); localtime_r(&lamigo_last_cleared, &tmtime); strftime(timestr, sizeof(timestr), "%c", &tmtime); @@ -783,13 +803,94 @@ static int lamigo_init_cache(void) return 0; } -static int lamigo_exec_cmd(struct resync_agent *a, const char *cmd, - int *pstatus, char *outbuf, int outbuf_size, - char *errbuf, int errbuf_size) +static int lamigo_mirror_stats_parse(void *cbdata, + unsigned char *orig, int nr) +{ + struct resync_job *rj = (struct resync_job *)cbdata; + yaml_parser_t parser; + yaml_token_t token; + char *buffer = NULL; + int *keyptr = NULL; + int rc = 0; + + if (rj->rj_dont_parse) + return 0; + + buffer = xstrdup(orig); + + if (!yaml_parser_initialize(&parser)) { + LX_ERROR("can't initialize yaml parser\n"); + rj->rj_dont_parse = 1; + goto out; + } + token.type = 0; + yaml_parser_set_input_string(&parser, buffer, nr); + + while (token.type != YAML_STREAM_END_TOKEN && rc == 0) { + yaml_token_delete(&token); + if (!yaml_parser_scan(&parser, &token)) { + LX_ERROR("can't parse yaml: %s\n", orig); + rj->rj_dont_parse = 1; + break; + } + if (token.type == YAML_KEY_TOKEN) { + yaml_token_delete(&token); + if (!yaml_parser_scan(&parser, &token)) { + LX_ERROR("can't parse yaml: %s\n", orig); + rj->rj_dont_parse = 1; + break; + } + if (token.type == YAML_SCALAR_TOKEN) { + char *k = token.data.scalar.value; + if (!strcmp(k, "seconds")) { + keyptr = &rj->rj_seconds; + } else if (!strcmp(k, "rmbps")) { + keyptr = &rj->rj_rmbps; + } else if (!strcmp(k, "wmbps")) { + keyptr = &rj->rj_wmbps; + } else if (!strcmp(k, "copied")) { + keyptr = &rj->rj_copied; + } else if (!strcmp(k, "size")) { + keyptr = &rj->rj_size; + } else if (!strcmp(k, "pct")) { + keyptr = &rj->rj_pct; + } else { + keyptr = NULL; + } + } + continue; + } + if (token.type == YAML_VALUE_TOKEN) { + yaml_token_delete(&token); + if (!yaml_parser_scan(&parser, &token)) { + LX_ERROR("can't parse yaml: %s\n", orig); + rj->rj_dont_parse = 1; + break; + } + if (token.type == YAML_SCALAR_TOKEN) { + if (keyptr) + *keyptr = atoi(token.data.scalar.value); + keyptr = NULL; + } + continue; + } + if (token.type == YAML_FLOW_ENTRY_TOKEN) + continue; + keyptr = NULL; + } + + yaml_token_delete(&token); + yaml_parser_delete(&parser); + LX_DEBUG("%d seconds, %d pct\n", rj->rj_seconds, rj->rj_pct); + +out: + free(buffer); + return 0; +} + +static struct resync_ssh_session *lamigo_get_session(struct resync_agent *a) { struct resync_ssh_session *rss; - int rc; - char cmd1[PATH_MAX * 2]; pthread_mutex_lock(&a->rag_ssh_lock); /* @@ -804,27 +905,142 @@ static int lamigo_exec_cmd(struct resync_agent *a, const char *cmd, lipe_list_del(&rss->rss_list); pthread_mutex_unlock(&a->rag_ssh_lock); - /* Add lamigo identification */ - snprintf(cmd1, sizeof(cmd1), "%s # lamigo %s", cmd, opt.o_mdtname); - - rc = a->rag_is_local ? - system(cmd1) : - lipe_ssh_exec_timeout(&rss->rss_ctx, cmd1, pstatus, - outbuf, outbuf_size, errbuf, errbuf_size, - opt.o_ssh_exec_to); - if (rc) - llapi_error(LLAPI_MSG_INFO, rc, - "error executing ssh command '%s' on '%s'", - cmd, a->rag_hostname); + return rss; +} +static void lamigo_put_session(struct resync_agent *a, + struct resync_ssh_session *rss) +{ pthread_mutex_lock(&a->rag_ssh_lock); lipe_list_add(&rss->rss_list, &a->rag_ssh_list); pthread_cond_signal(&a->rag_ssh_cond); pthread_mutex_unlock(&a->rag_ssh_lock); +} + +static int lamigo_exec_cmd(struct resync_job *rj, const char *cmd, + int *pstatus, char *outbuf, int outbuf_size, + char *errbuf, int errbuf_size) +{ + struct resync_agent *a = rj->rj_agent ; + struct resync_ssh_session *rss; + int rc, timeout; + char cmd1[PATH_MAX * 2]; + ssh_channel channel = NULL; + char buf[1024]; + int nr, last_read; + + rss = lamigo_get_session(a); + + /* Add lamigo identification */ + snprintf(cmd1, sizeof(cmd1), "%s # lamigo %s", cmd, opt.o_mdtname); + + if (a->rag_is_local) { + /* XXX: parse stats output */ + rc = system(cmd1); + goto out; + } + + timeout = opt.o_ssh_exec_to; + if (a->rag_has_stats) + timeout = opt.o_mirror_stats_sec * 2; + + rc = lipe_ssh_start_cmd(&rss->rss_ctx, cmd1, &channel, + LIPE_SSH_TIMEOUT_INFINITE); + if (rc != SSH_OK) + goto out; + + assert(timeout > 0); + last_read = time(NULL); + + while (rc >= 0) { + rc = ssh_channel_read_timeout(channel, buf, + sizeof(buf) - 1, 0, + timeout * 1000); + if (rc > 0) { + last_read = time(NULL); + buf[rc] = '\0'; + lamigo_mirror_stats_parse(rj, buf, rc); + continue; + } + if (ssh_channel_is_eof(channel)) { + /* end of execution */ + break; + } + if (rc == 0 && (time(NULL) - last_read) >= timeout) { + LX_ERROR("error: no progress updates for %s\n", cmd1); + rc = -ETIMEDOUT; + goto out_ssh; + } + } + + rc = ssh_channel_get_exit_status(channel); + if (rc < 0) { + LX_ERROR("lipe_ssh_session_start_cmd failed: %d: %s\n", + rc, ssh_get_error(rss->rss_ctx.lsc_session)); + rc = SSH_ERROR; + goto out_ssh; + } + + *pstatus = rc; + rc = SSH_OK; + +out_ssh: + ssh_channel_send_eof(channel); + ssh_channel_close(channel); + ssh_channel_free(channel); + +out: + if (rc) + LX_ERROR("error executing ssh command '%s' on '%s'", + cmd, a->rag_hostname); + + lamigo_put_session(a, rss); return rc; } +static void lamigo_agent_check_options(struct resync_agent *agent) +{ + struct resync_ssh_session *rss; + char outbuf[4096], errbuf[4096]; + int status = INT_MAX; + int rc; + + if (agent->rag_opts_checked == true) + return; + + agent->rag_has_bwlimit = false; + agent->rag_has_stats = false; + + rss = lamigo_get_session(agent); + + if (agent->rag_is_local) { + /* XXX: grab stdout/stderr */ + rc = system(opt.o_mirror_cmd); + } else { + rc = lipe_ssh_exec(&rss->rss_ctx, opt.o_mirror_cmd, + &status, outbuf, sizeof(outbuf), + errbuf, sizeof(errbuf), + opt.o_ssh_exec_to); + } + + agent->rag_extra_options[0] = '\0'; + if (rc == 0) { + if (strstr(errbuf, "stats-interval") != NULL) { + rc = snprintf(agent->rag_extra_options, + sizeof(agent->rag_extra_options) - 1, + "--stats --stats-interval=%d", + opt.o_mirror_stats_sec); + agent->rag_has_stats = true; + } + if (strstr(errbuf, "bandwidth") != NULL) + agent->rag_has_bwlimit = true; + agent->rag_opts_checked = true; + } + + lamigo_put_session(agent, rss); +} + static void *lamigo_replicate_one(void *args) { struct resync_job *rj = (struct resync_job *)args; @@ -835,6 +1051,8 @@ static void *lamigo_replicate_one(void *args) int status = INT_MAX; int rc; + lamigo_agent_check_options(agent); + if (rj->rj_setprefer) { snprintf(cmd, sizeof(cmd), "lfs setstripe --comp-set --comp-flags=prefer --pool='%s' " @@ -859,12 +1077,16 @@ static void *lamigo_replicate_one(void *args) i += snprintf(cmd + i, sizeof(cmd) - i, " --flags='%s'", rj->rj_mirror_opts); i += snprintf(cmd + i, sizeof(cmd) - i, - " '%s/.lustre/fid/"DFID"'", - agent->rag_mountpoint, PFID(&rj->rj_fid)); + " %s '%s/.lustre/fid/"DFID"'", + agent->rag_extra_options, + agent->rag_mountpoint, + PFID(&rj->rj_fid)); } else if (resync == AMIGO_RESYNC_RESYNC) { snprintf(cmd, sizeof(cmd), - "%s '%s/.lustre/fid/"DFID"'", - opt.o_resync_cmd, agent->rag_mountpoint, + "%s %s '%s/.lustre/fid/"DFID"'", + opt.o_resync_cmd, + agent->rag_extra_options, + agent->rag_mountpoint, PFID(&rj->rj_fid)); } else { LX_ERROR("unknown resync: %d\n", resync); @@ -874,7 +1096,7 @@ static void *lamigo_replicate_one(void *args) /* rc < 0 means an ssh error. Otherwise command exit status is * in status. Mask common exit statuses. */ - rc = lamigo_exec_cmd(agent, cmd, &status, outbuf, sizeof(outbuf), + rc = lamigo_exec_cmd(rj, cmd, &status, outbuf, sizeof(outbuf), errbuf, sizeof(errbuf)); LX_DEBUG("exec command '%s' on '%s': rc = %d, status = %d\n", cmd, agent->rag_hostname, rc, status); @@ -1429,15 +1651,24 @@ out: static void *lamigo_check_agent_func(void *args) { - char cmd[PATH_MAX]; struct resync_agent *a = (struct resync_agent *)args; + struct resync_ssh_session *rss; + char cmd[PATH_MAX]; int status = INT_MAX; int rc; snprintf(cmd, sizeof(cmd), "lfs path2fid '%s'", a->rag_mountpoint); - rc = lamigo_exec_cmd(a, cmd, &status, NULL, 0, NULL, 0); + rss = lamigo_get_session(a); + + if (a->rag_is_local) + rc = system(cmd); + else + rc = lipe_ssh_exec(&rss->rss_ctx, cmd, &status, + NULL, 0, NULL, 0, opt.o_ssh_exec_to); + + lamigo_put_session(a, rss); pthread_exit((void *)(intptr_t)(rc < 0 ? rc : status)); } @@ -1814,6 +2045,7 @@ static void lamigo_check_and_clear_changelog(void) static void lamigo_job_fini(struct resync_job *rj, intptr_t retval) { rj->rj_done_timestamp = time(NULL); + rj->rj_try++; if (rj->rj_check_job) { rj->rj_agent->rag_check_in_progress = 0; @@ -1834,6 +2066,7 @@ static void lamigo_job_fini(struct resync_job *rj, intptr_t retval) assert(lamigo_max_jobs >= rj->rj_agent->rag_maxjobs); lamigo_max_jobs -= rj->rj_agent->rag_maxjobs; rj->rj_agent->rag_bad = true; + rj->rj_agent->rag_opts_checked = false; } } free(rj); @@ -1857,18 +2090,30 @@ static void lamigo_job_fini(struct resync_job *rj, intptr_t retval) * in the changelog, we can just cancel our record */ stats.s_busy++; - } else if (retval == SSH_ERROR) { + } else if (retval == SSH_ERROR || retval == -ETIMEDOUT) { /* couldn't connect */ /* XXX: resubmit the job */ if (!rj->rj_agent->rag_bad) { rj->rj_agent->rag_bad = true; + rj->rj_agent->rag_opts_checked = false; lamigo_max_jobs -= rj->rj_agent->rag_maxjobs; } rj->rj_pid = 0; rj->rj_agent = NULL; - lipe_list_add_tail(&rj->rj_list, &lamigo_failed_job_list); - lamigo_jobs_delayed++; - rj = NULL; + + /* + * if we interrupted the job with ETIMEDOUT, then + * ssh was doing OK and the changelog got new + * OPEN/CLOSE events which will cause another try + */ + if (retval == -ETIMEDOUT) + stats.s_job_timedout++; + if (retval == SSH_ERROR) { + lipe_list_add_tail(&rj->rj_list, &lamigo_failed_job_list); + lamigo_jobs_delayed++; + rj = NULL; + } + } else if (retval == 127) { /* likely invalid setup on the agent (missing lfs?) */ /* XXX: resubmit the job */ @@ -1939,6 +2184,7 @@ static void lamigo_add_agent(const char *host, const char *mnt, char *jobs) a->rag_jobs = 0; a->rag_bad = true; + a->rag_opts_checked = false; a->rag_check_in_progress = 0; pthread_mutex_init(&a->rag_ssh_lock, NULL); pthread_cond_init(&a->rag_ssh_cond, NULL); @@ -3142,7 +3388,6 @@ static void lamigo_changelog_user_register(void) } static unsigned long progress_timestamp; -static unsigned long progress_last_processed; static void lamigo_show_progress(void) { @@ -3151,7 +3396,6 @@ static void lamigo_show_progress(void) if (now - progress_timestamp < opt.o_progress_interval) return; progress_timestamp = now; - progress_last_processed = stats.s_processed; lamigo_alr_stat_avg(); diff --git a/lipe/src/lamigo.h b/lipe/src/lamigo.h index 1db61ce4..06bdfe3 100644 --- a/lipe/src/lamigo.h +++ b/lipe/src/lamigo.h @@ -117,6 +117,7 @@ struct options { int o_slow_pool_max_used; int o_progress_interval; /* how often to show progress */ long o_ssh_exec_to; /* ssh connection timeout for exec command */ + int o_mirror_stats_sec; /* extend/resync stats interval */ char *o_alr_cmd; char *o_alr_extra_args; int o_alr_periods; diff --git a/lipe/src/lamigo_alr.c b/lipe/src/lamigo_alr.c index b594767..0424d20 100644 --- a/lipe/src/lamigo_alr.c +++ b/lipe/src/lamigo_alr.c @@ -297,7 +297,7 @@ static int init_features(struct alr_agent *ala) snprintf(cmd, sizeof(cmd), "%s --help", opt.o_alr_cmd); - rc = lipe_ssh_start_cmd_timeout(&ala->ala_ctx, cmd, &channel, 5); + rc = lipe_ssh_start_cmd(&ala->ala_ctx, cmd, &channel, 5); if (rc != SSH_OK) { LX_ERROR("cannot start '%s' on host '%s': rc = %d\n", cmd, ala->ala_host, rc); @@ -374,7 +374,7 @@ static int lamigo_alr_agent_run(struct alr_agent *ala) mdtidx, opt.o_alr_extra_args); ala->ala_last_alr_restarted = time(NULL); - rc = lipe_ssh_start_cmd_timeout(&ala->ala_ctx, cmd, &channel, 5); + rc = lipe_ssh_start_cmd(&ala->ala_ctx, cmd, &channel, 5); if (rc != SSH_OK) { LX_ERROR("cannot start '%s' on host '%s': rc = %d\n", cmd, ala->ala_host, rc); diff --git a/lipe/src/lipe_ssh.c b/lipe/src/lipe_ssh.c index cd171e8..8430530 100644 --- a/lipe/src/lipe_ssh.c +++ b/lipe/src/lipe_ssh.c @@ -194,12 +194,11 @@ out: return rc; } -static int lipe_ssh_context_check(struct lipe_ssh_context *ctx) +int lipe_ssh_context_check(struct lipe_ssh_context *ctx) { - if (ctx->lsc_session == NULL) - return lipe_ssh_session_create(&ctx->lsc_session, ctx->lsc_host); - - return SSH_OK; + if (ctx->lsc_session) + return SSH_OK; + return lipe_ssh_session_create(&ctx->lsc_session, ctx->lsc_host); } static void lipe_ssh_context_fail(struct lipe_ssh_context *ctx) @@ -226,8 +225,8 @@ int lipe_ssh_context_init(struct lipe_ssh_context *ctx, const char *host) return SSH_OK; } -int lipe_ssh_start_cmd_timeout(struct lipe_ssh_context *ctx, const char *cmd, - ssh_channel *pchannel, long timeout_sec) +int lipe_ssh_start_cmd(struct lipe_ssh_context *ctx, const char *cmd, + ssh_channel *pchannel, long timeout_sec) { int rc; @@ -254,14 +253,9 @@ int lipe_ssh_start_cmd_timeout(struct lipe_ssh_context *ctx, const char *cmd, return rc; } -int lipe_ssh_start_cmd(struct lipe_ssh_context *ctx, const char *cmd, ssh_channel *pchannel) -{ - return lipe_ssh_start_cmd_timeout(ctx, cmd, pchannel, LIPE_SSH_TIMEOUT_INFINITE); -} - -int lipe_ssh_exec_timeout(struct lipe_ssh_context *ctx, const char *cmd, int *pstatus, - char *outbuf, int outbuf_size, char *errbuf, int errbuf_size, - long timeout_sec) +int lipe_ssh_exec(struct lipe_ssh_context *ctx, const char *cmd, + int *pstatus, char *outbuf, int outbuf_size, + char *errbuf, int errbuf_size, long timeout_sec) { int rc; @@ -290,10 +284,3 @@ int lipe_ssh_exec_timeout(struct lipe_ssh_context *ctx, const char *cmd, int *ps return rc; } - -int lipe_ssh_exec(struct lipe_ssh_context *ctx, const char *cmd, int *pstatus, - char *outbuf, int outbuf_size, char *errbuf, int errbuf_size) -{ - return lipe_ssh_exec_timeout(ctx, cmd, pstatus, outbuf, outbuf_size, - errbuf, errbuf_size, LIPE_SSH_TIMEOUT_INFINITE); -} diff --git a/lipe/src/lipe_ssh.h b/lipe/src/lipe_ssh.h index 5067663..7c83f9b 100644 --- a/lipe/src/lipe_ssh.h +++ b/lipe/src/lipe_ssh.h @@ -17,14 +17,10 @@ struct lipe_ssh_context { int lipe_ssh_context_init(struct lipe_ssh_context *ctx, const char *host); void lipe_ssh_context_destroy(struct lipe_ssh_context *ctx); - -int lipe_ssh_exec_timeout(struct lipe_ssh_context *ctx, const char *cmd, - int *pstatus, char *outbuf, int outbuf_size, - char *errbuf, int errbuf_size, long timeout_sec); -int lipe_ssh_start_cmd_timeout(struct lipe_ssh_context *ctx, const char *cmd, - ssh_channel *pchannel, long timeout_sec); -int lipe_ssh_exec(struct lipe_ssh_context *ctx, const char *cmd, int *pstatus, - char *outbuf, int outbuf_size, char *errbuf, int errbuf_size); -int lipe_ssh_start_cmd(struct lipe_ssh_context *ctx, const char *cmd, ssh_channel *pchannel); +int lipe_ssh_exec(struct lipe_ssh_context *ctx, const char *cmd, + int *pstatus, char *outbuf, int outbuf_size, + char *errbuf, int errbuf_size, long timeout_sec); +int lipe_ssh_start_cmd(struct lipe_ssh_context *ctx, const char *cmd, + ssh_channel *pchannel, long timeout_sec); #endif /* _LIPE_SSH_H_ */ diff --git a/lipe/src/lx_log.h b/lipe/src/lx_log.h index 391480d..4c80044 100644 --- a/lipe/src/lx_log.h +++ b/lipe/src/lx_log.h @@ -44,6 +44,7 @@ static inline double lx_now(void) lx_log_prefix ? lx_log_prefix : "", \ lx_log_prefix ? ": " : "", \ ##args); \ + fflush(stderr); \ } \ errno = _lx_print_saved_errno; \ } while (0) diff --git a/lustre/tests/hot-pools.sh b/lustre/tests/hot-pools.sh index 2674a94..af9609a 100755 --- a/lustre/tests/hot-pools.sh +++ b/lustre/tests/hot-pools.sh @@ -57,6 +57,7 @@ LAMIGO_THREAD_NUM=${LAMIGO_THREAD_NUM:-""} LAMIGO_PROG_INTV=${LAMIGO_PROG_INTV:-""} LAMIGO_CACHE=${LAMIGO_CACHE:-""} LAMIGO_MIRROR_CMD=${LAMIGO_MIRROR_CMD:-""} +LAMIGO_RESYNC_CMD=${LAMIGO_RESYNC_CMD:-""} LAMIGO_DEBUG=${LAMIGO_DEBUG:-true} LAMIGO_RESCAN=${LAMIGO_RESCAN:-false} LAMIGO_TIMESTAMPS=${LAMIGO_TIMESTAMPS:-true} @@ -276,6 +277,7 @@ start_one_lamigo_cmd() { cmd+=${LAMIGO_CACHE:+" --max-cache=$LAMIGO_CACHE"} cmd+=${LAMIGO_PROG_INTV:+" --progress-interval=$LAMIGO_PROG_INTV"} cmd+=${LAMIGO_MIRROR_CMD:+" --mirror-cmd=\"$LAMIGO_MIRROR_CMD\""} + cmd+=${LAMIGO_RESYNC_CMD:+" --resync-cmd=\"$LAMIGO_RESYNC_CMD\""} cmd+=${LAMIGO_DUMP:+" -w ${LAMIGO_DUMP}.$mdt"} @@ -363,6 +365,8 @@ create_one_lamigo_cfg() { echo \\\"$LAMIGO_EXTRA\\\" >> $cfg_file; [[ -z \\\"$LAMIGO_MIRROR_CMD\\\" ]] || echo mirror-cmd=$LAMIGO_MIRROR_CMD >> $cfg_file; + [[ -z \\\"$LAMIGO_RESYNC_CMD\\\" ]] || + echo resync-cmd=$LAMIGO_RESYNC_CMD >> $cfg_file; [[ -z \\\"$LAMIGO_DUMP\\\" ]] || echo dump=\\\"${LAMIGO_DUMP}.$mdt\\\" >> $cfg_file; ! $LAMIGO_DEBUG || echo debug >> $cfg_file; @@ -2344,6 +2348,158 @@ test_74() { } run_test 74 "ofd keepalive message" +test_75a() { + local tf=$DIR/$tfile + + init_hot_pools_env + LAMIGO_MIRROR_CMD="lfs mirror extend -N -W1M" \ + start_lamigo_cfg + check_lamigo_is_started || { + local facet=${LAMIGO_MDT_FACET[0]} + local log_file=$(lamigo_logfile $facet) + echo "!!!! LAMIGO LOG !!!!" + do_facet $facet "cat $log_file" + echo "!!!! LAMIGO LOG END !!!!" + error "failed to start lamigo" + } + stack_trap stop_lamigo_cmd + + $LFS setstripe -p $LAMIGO_SRC $tf || + error "$LFS setstripe $tf failed" + dd if=/dev/zero of=$tf bs=1M count=32 || + error "can't dd" + cancel_lru_locks osc + + local FID=$($LFS path2fid $tf) + [[ -z $FID ]] && error "can't get fid for $tf" + FID=${FID##[} + FID=${FID%%]} + + local js + local pct + for ((i=0; i < (($LAMIGO_AGE*2)); i++)); do + js=$(dump_lamigo_stats | grep -E "job[0-9]*:.*fid:.*$FID") + [[ $js == *progress:* ]] && { + pct=${js##*progress: } + pct=${pct%%%,*} + (( pct > 0 && pct < 100 )) && + echo "got $pct%" && break + } + sleep 1 + done + (( pct > 0 && pct < 100 )) || { + local facet=${LAMIGO_MDT_FACET[0]} + local log_file=$(lamigo_logfile $facet) + echo "!!!! ======= LAMIGO LOG for $facet =======" + do_facet $facet "cat $log_file" + echo "!!!! ======= LAMIGO LOG for $facet =======" + facet=${LAMIGO_MDT_FACET[1]} + log_file=$(lamigo_logfile $facet) + echo "!!!! ======= LAMIGO LOG for $facet =======" + do_facet $facet "cat $log_file" + echo "!!!! ======= LAMIGO LOG for $facet =======" + error "can't get job completion's percent" + } +} +run_test 75a "lamigo to parse mirroring progress" + +test_75b() { + local tf=$DIR/$tfile + + init_hot_pools_env + LAMIGO_RESYNC_CMD="lfs mirror resync -W1M" start_lamigo_cfg + check_lamigo_is_started || { + local facet=${LAMIGO_MDT_FACET[0]} + local log_file=$(lamigo_logfile $facet) + echo "!!!! LAMIGO LOG !!!!" + do_facet $facet "cat $log_file" + echo "!!!! LAMIGO LOG END !!!!" + error "failed to start lamigo" + } + stack_trap stop_lamigo_cfg + + $LFS setstripe -p $LAMIGO_SRC $tf || + error "$LFS setstripe $tf failed" + dd if=/dev/zero of=$tf bs=1M count=32 || + error "can't dd" + $LFS mirror extend -N -p $LAMIGO_TGT $tf || + error "can't make replica" + verify_mirror_count $tf 2 + + # invalidate replica + echo 123 >>$tf + $LFS getstripe -v $tf | grep flag.*stale || { + $LFS getstripe -v $tf + error "not invalidated?" + } + + local FID=$($LFS path2fid $tf) + [[ -z $FID ]] && error "can't get fid for $tf" + FID=${FID##[} + FID=${FID%%]} + + local js + local pct + for ((i=0; i < (($LAMIGO_AGE*2)); i++)); do + js=$(dump_lamigo_stats | grep -E "job[0-9]*:.*fid:.*$FID") + [[ $js == *progress:* ]] && { + pct=${js##*progress: } + pct=${pct%%%,*} + (( pct > 0 && pct < 100 )) && + echo "got $pct%" && break + } + sleep 2 + done + (( pct > 0 && pct < 100 )) || { + $LFS getstripe -v $tf + local facet=${LAMIGO_MDT_FACET[0]} + local log_file=$(lamigo_logfile $facet) + echo "!!!! ======= LAMIGO LOG for $facet =======" + do_facet $facet "cat $log_file" + echo "!!!! ======= LAMIGO LOG for $facet =======" + facet=${LAMIGO_MDT_FACET[1]} + log_file=$(lamigo_logfile $facet) + echo "!!!! ======= LAMIGO LOG for $facet =======" + do_facet $facet "cat $log_file" + echo "!!!! ======= LAMIGO LOG for $facet =======" + error "can't get job completion's percent" + } +} +run_test 75b "lamigo to parse mirror resync progress" + +test_75c() { + local tf=$DIR/$tfile + + init_hot_pools_env + + # redirect stats to /dev/null + LAMIGO_AGE=5 LAMIGO_MIRROR_CMD="lfs mirror extend -N -W1M \>/dev/null" \ + start_lamigo_service + check_lamigo_is_started || error "failed to start lamigo" + stack_trap stop_lamigo_service + + $LFS setstripe -p $LAMIGO_SRC $tf || + error "$LFS setstripe $tf failed" + dd if=/dev/zero of=$tf bs=1M count=32 || + error "can't dd" + cancel_lru_locks osc + + local FID=$($LFS path2fid $tf) + [[ -z $FID ]] && error "can't get fid for $tf" + + local nr + nr=$(dump_one_lamigo_stats | awk '/job-timedout:/{print $2}') + (( $nr == 0 )) || error "unexptected job-timedout: $nr" + for ((i=0; i < (($LAMIGO_AGE*2)); i++)); do + nr=$(dump_one_lamigo_stats | awk '/job-timedout:/{print $2}') + (( $nr > 0 )) && break + dump_one_lamigo_stats | grep job-timedout + sleep 2 + done + (( $nr > 0 )) || error "no timedout jobs" +} +run_test 75c "lamigo to timeout if no progress reported from mirror extend" + test_76() { local td=$DIR/$tdir local tf=$td/$tfile -- 1.8.3.1