From 27ebdf8e7428fede98f1d8885c48fe3ab2e28ce0 Mon Sep 17 00:00:00 2001 From: Alexandre Ioffe Date: Sat, 25 Nov 2023 00:36:42 -0800 Subject: [PATCH] EX-8590 lipe: Use SSH poll to read stdout/err unblocking Limit to use only one client machine for hot-pools tests 75* Fix skip condition for tests 75a,b,c when bandwidth limit options are not available. Use ssh poll and unblocking read to read stdout/err in loop to prevent losing the output when it is not ready. Test-Parameters: trivial testlist=hot-pools Test-Parameters: testlist=hot-pools env=ONLY=75a,ONLY_REPEAT=82 Test-Parameters: testlist=hot-pools env=ONLY=75b,ONLY_REPEAT=82 Signed-off-by: Alexandre Ioffe Change-Id: Ibe07cdd51197c1f3c048b7fcdab6caff850067e7 Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/53288 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Andreas Dilger --- lipe/src/lamigo.c | 117 +++++++++++++++++++--------------------------- lipe/src/lipe_ssh.c | 100 ++++++++++++++++++++++++++++++++------- lipe/src/lipe_ssh.h | 4 ++ lustre/tests/hot-pools.sh | 10 ++-- 4 files changed, 139 insertions(+), 92 deletions(-) diff --git a/lipe/src/lamigo.c b/lipe/src/lamigo.c index f8518e8..07f96aa 100644 --- a/lipe/src/lamigo.c +++ b/lipe/src/lamigo.c @@ -805,6 +805,10 @@ static int lamigo_init_cache(void) return 0; } +/* + * Return: 0 - continue parse + * 1 - stop parsing + */ static int lamigo_mirror_stats_parse(void *cbdata, unsigned char *orig, int nr) { @@ -816,12 +820,13 @@ static int lamigo_mirror_stats_parse(void *cbdata, int rc = 0; if (rj->rj_dont_parse) - return 0; + return 1; buffer = xstrdup(orig); if (!yaml_parser_initialize(&parser)) { LX_ERROR("can't initialize yaml parser\n"); rj->rj_dont_parse = 1; + rc = 1; goto out; } token.type = 0; @@ -832,6 +837,7 @@ static int lamigo_mirror_stats_parse(void *cbdata, if (!yaml_parser_scan(&parser, &token)) { LX_ERROR("can't parse yaml: %s\n", orig); rj->rj_dont_parse = 1; + rc = 1; break; } if (token.type == YAML_KEY_TOKEN) { @@ -839,6 +845,7 @@ static int lamigo_mirror_stats_parse(void *cbdata, if (!yaml_parser_scan(&parser, &token)) { LX_ERROR("can't parse yaml: %s\n", orig); rj->rj_dont_parse = 1; + rc = 1; break; } if (token.type == YAML_SCALAR_TOKEN) { @@ -866,6 +873,7 @@ static int lamigo_mirror_stats_parse(void *cbdata, if (!yaml_parser_scan(&parser, &token)) { LX_ERROR("can't parse yaml: %s\n", orig); rj->rj_dont_parse = 1; + rc = 1; break; } if (token.type == YAML_SCALAR_TOKEN) { @@ -886,7 +894,7 @@ static int lamigo_mirror_stats_parse(void *cbdata, out: free(buffer); - return 0; + return rc; } static struct resync_ssh_session *lamigo_get_session(struct resync_agent *a) @@ -923,10 +931,10 @@ static int lamigo_exec_cmd(struct resync_job *rj, const char *cmd, int *pstatus, { struct resync_agent *a = rj->rj_agent; struct resync_ssh_session *rss; - int rc, timeout_msec; + int rc, timeout_sec; char cmd1[PATH_MAX * 2]; ssh_channel channel = NULL; - char buf[1024]; + unsigned char buf[1024]; int nr; rss = lamigo_get_session(a); @@ -948,51 +956,20 @@ static int lamigo_exec_cmd(struct resync_job *rj, const char *cmd, int *pstatus, goto out; } - timeout_msec = opt.o_ssh_exec_to * 1000; + timeout_sec = opt.o_ssh_exec_to; if (a->rag_has_stats) - timeout_msec = opt.o_mirror_stats_sec * opt.o_stats_missed * 1000; - - assert(timeout_msec > 0); + timeout_sec = opt.o_mirror_stats_sec * opt.o_stats_missed; - while (ssh_channel_is_open(channel) && !ssh_channel_is_eof(channel)) { - rc = ssh_channel_poll_timeout(channel, timeout_msec, 0); - if (rc == SSH_ERROR) { - LX_ERROR("polling ssh channel failed for '%s' on '%s'\n", - cmd, a->rag_hostname); - goto out_ssh; - } - if (rc == SSH_EOF) { - rc = SSH_OK; - break; - } - if (rc == SSH_AGAIN || /* Bug fixed since libssh 0.9.5 */ - rc == SSH_OK) { /* Timeout */ - LX_ERROR("no progress updates for '%s' on '%s'\n", - cmd, a->rag_hostname); - rc = -ETIMEDOUT; - goto out_ssh; - } - - rc = ssh_channel_read_nonblocking(channel, buf, sizeof(buf)-1, 0); - - if (rc == SSH_ERROR) { - LX_ERROR("reading ssh failed for '%s' on '%s'\n", - cmd, a->rag_hostname); - goto out_ssh; - } - /* Nothing is available or timeout */ - if (rc == SSH_AGAIN || rc == SSH_OK) { - if (ssh_channel_is_eof(channel)) { - rc = SSH_OK; - break; - } - continue; - } + assert(timeout_sec > 0); - buf[rc] = '\0'; - /* LX_DEBUG("ssh read rc=%d '%s'\n", rc, buf); */ - lamigo_mirror_stats_parse(rj, buf, rc); - } /* while */ + rc = lipe_ssh_read_std(channel, a->rag_hostname, timeout_sec, + buf, sizeof(buf), 0, + lamigo_mirror_stats_parse, rj); + if (rc < 0) { + LX_ERROR("Error getting stdout for '%s' on '%s':%d\n", + cmd, a->rag_hostname, rc); + goto out_ssh; + } rc = ssh_channel_get_exit_status(channel); if (rc < 0) { @@ -1005,12 +982,9 @@ static int lamigo_exec_cmd(struct resync_job *rj, const char *cmd, int *pstatus, *pstatus = rc; rc = SSH_OK; - if (stderr_buf) { - nr = ssh_channel_read_nonblocking(channel, stderr_buf, - stderr_size - 1, true); - if (nr > 0) - stderr_buf[nr] = '\0'; - } + if (stderr_buf) + lipe_ssh_read_std(channel, a->rag_hostname, 5, stderr_buf, + stderr_size, 1, NULL, NULL); out_ssh: ssh_channel_send_eof(channel); @@ -1034,6 +1008,7 @@ static void lamigo_agent_check_options(struct resync_agent *agent) agent->rag_has_bwlimit = false; agent->rag_has_stats = false; + agent->rag_extra_options[0] = '\0'; rss = lamigo_get_session(agent); @@ -1042,27 +1017,31 @@ static void lamigo_agent_check_options(struct resync_agent *agent) rc = system(opt.o_mirror_cmd); } else { rc = lipe_ssh_exec(&rss->rss_ctx, opt.o_mirror_cmd, - &status, outbuf, sizeof(outbuf), - errbuf, sizeof(errbuf), - opt.o_ssh_exec_to); + &status, outbuf, sizeof(outbuf), + errbuf, sizeof(errbuf), + opt.o_ssh_exec_to); } - agent->rag_extra_options[0] = '\0'; - if (rc == 0) { - if ((strstr(errbuf, "stats-interval") != NULL) || - (strstr(outbuf, "stats-interval") != NULL)) { - rc = snprintf(agent->rag_extra_options, - sizeof(agent->rag_extra_options) - 1, - "--stats --stats-interval=%d", - opt.o_mirror_stats_sec); - agent->rag_has_stats = true; - } - if ((strstr(errbuf, "bandwidth") != NULL) || - (strstr(outbuf, "bandwidth") != NULL)) - agent->rag_has_bwlimit = true; - agent->rag_opts_checked = true; + if (rc != 0) { + LX_ERROR("getting lfs options failed\n"); + goto out; } + if ((strstr(errbuf, "stats-interval") != NULL) || + (strstr(outbuf, "stats-interval") != NULL)) { + rc = snprintf(agent->rag_extra_options, + sizeof(agent->rag_extra_options) - 1, + "--stats --stats-interval=%d", + opt.o_mirror_stats_sec); + agent->rag_has_stats = true; + } + if ((strstr(errbuf, "bandwidth") != NULL) || + (strstr(outbuf, "bandwidth") != NULL)) + agent->rag_has_bwlimit = true; + + agent->rag_opts_checked = true; + +out: lamigo_put_session(agent, rss); } diff --git a/lipe/src/lipe_ssh.c b/lipe/src/lipe_ssh.c index 8430530..57195bd 100644 --- a/lipe/src/lipe_ssh.c +++ b/lipe/src/lipe_ssh.c @@ -60,11 +60,83 @@ out: return rc; } -static int lipe_ssh_session_exec_cmd(ssh_session session, const char *cmd, int *pstatus, - char *stdout_buf, int stdout_size, - char *stderr_buf, int stderr_size) +/* Read std out/err back from ssh exec. + * If cb is provided, cb is called for each read. + * Otherwise multiple reads are concatenated in buf until eof + * or buffer is full + * is_stderr = 0 - read stdout + * = 1 - read stderr + * timeout_sec < 0 - infinite timeout + * Return: + * Output zero terminated string in buf (when cb = NULL) + * rc < 0 - error + * rc >= String length in buf + */ +int lipe_ssh_read_std(ssh_channel channel, char *host, int timeout_sec, + char *buf, size_t bufsize, int is_stderr, + lipe_ssh_read_std_callback_t cb, void *cbparam) +{ + const int timeout_msec = timeout_sec * 1000; + size_t n = 0; + int rc; + + while (ssh_channel_is_open(channel) && !ssh_channel_is_eof(channel)) { + rc = ssh_channel_poll_timeout(channel, timeout_msec, is_stderr); + if (rc == SSH_ERROR) { + LX_ERROR("polling ssh channel failed on '%s'\n", host); + break; + } + if (rc == SSH_EOF) { + rc = SSH_OK; + break; + } + if (rc == SSH_AGAIN || /* Bug fixed since libssh 0.9.5 */ + rc == SSH_OK) { /* Timeout */ + rc = -ETIMEDOUT; + break; + } + + rc = ssh_channel_read_nonblocking(channel, &buf[n], + bufsize-n-1, is_stderr); + if (rc == SSH_ERROR) { + LX_ERROR("reading ssh std failed on '%s'\n", host); + break; + } + + /* Nothing is available or timeout. Not expected */ + if (rc == SSH_AGAIN || rc == SSH_OK || rc == SSH_EOF) { + rc = SSH_OK; + break; + } + + if (cb) { + if (cb(cbparam, buf, rc)) + break; + } else { + n += rc; + + if (n > bufsize-1) { + n = bufsize - 1; + break; + } + } + } /* while */ + + buf[n] = '\0'; + + if (rc >= 0) + rc = n; + + return rc; +} + +static int lipe_ssh_session_exec_cmd(ssh_session session, const char *cmd, + int *pstatus, + char *stdout_buf, int stdout_size, + char *stderr_buf, int stderr_size) { ssh_channel channel = NULL; + char *host; int rc, nr; rc = lipe_ssh_session_start_cmd(session, cmd, &channel); @@ -82,21 +154,15 @@ static int lipe_ssh_session_exec_cmd(ssh_session session, const char *cmd, int * *pstatus = rc; rc = SSH_OK; out: - if (stdout_buf) { - stdout_buf[0] = '\0'; - nr = ssh_channel_read_nonblocking(channel, stdout_buf, - stdout_size - 1, false); - if (nr > 0) - stdout_buf[nr] = '\0'; - } + ssh_options_get(session, SSH_OPTIONS_HOST, &host); - if (stderr_buf) { - stderr_buf[0] = '\0'; - nr = ssh_channel_read_nonblocking(channel, stderr_buf, - stderr_size - 1, true); - if (nr > 0) - stderr_buf[nr] = '\0'; - } + if (stdout_buf) + lipe_ssh_read_std(channel, host, 5, stdout_buf, stdout_size, 0, + NULL, NULL); + + if (stderr_buf) + lipe_ssh_read_std(channel, host, 5, stderr_buf, stderr_size, 1, + NULL, NULL); ssh_channel_send_eof(channel); ssh_channel_close(channel); diff --git a/lipe/src/lipe_ssh.h b/lipe/src/lipe_ssh.h index 7c83f9b..c50d3bb 100644 --- a/lipe/src/lipe_ssh.h +++ b/lipe/src/lipe_ssh.h @@ -22,5 +22,9 @@ int lipe_ssh_exec(struct lipe_ssh_context *ctx, const char *cmd, char *errbuf, int errbuf_size, long timeout_sec); int lipe_ssh_start_cmd(struct lipe_ssh_context *ctx, const char *cmd, ssh_channel *pchannel, long timeout_sec); +typedef int (*lipe_ssh_read_std_callback_t)(void *, unsigned char *, int); +int lipe_ssh_read_std(ssh_channel channel, char *host, int timeout_sec, + char *buf, size_t bufsize, int is_stderr, + lipe_ssh_read_std_callback_t cb, void *cbparam); #endif /* _LIPE_SSH_H_ */ diff --git a/lustre/tests/hot-pools.sh b/lustre/tests/hot-pools.sh index ef84187..5c74fcd 100755 --- a/lustre/tests/hot-pools.sh +++ b/lustre/tests/hot-pools.sh @@ -2356,9 +2356,8 @@ test_75a() { local client=$HOSTNAME # The test is not valid if stats-interval is not supported # The test is going to use only client machine - if lfs mirror extend | grep -q stats-interval ; then + lfs mirror extend 2>&1 | grep -q stats-interval || skip "Client $client requires 'lfs mirror extend --stats-interval' support" - fi local tf=$DIR/$tfile @@ -2420,9 +2419,8 @@ test_75b() { local client=$HOSTNAME # The test is not valid if stats-interval is not supported. # The test is going to use only client machine. - if lfs mirror extend | grep -q stats-interval ; then + lfs mirror extend 2>&1 | grep -q stats-interval || skip "Client $client requires 'lfs mirror extend --stats-interval' support" - fi local tf=$DIR/$tfile @@ -2493,9 +2491,8 @@ test_75c() { local client=$HOSTNAME # The test not valid if stats-interval is not supported # The test is going to use only client machine. - if lfs mirror extend | grep -q stats-interval ; then + lfs mirror extend 2>&1 | grep -q stats-interval || skip "Client $client requires 'lfs mirror extend --stats-interval' support" - fi local tf=$DIR/$tfile local lamigo_age=5 @@ -2507,6 +2504,7 @@ test_75c() { LAMIGO_STATS_MISSING=1 \ LAMIGO_CLIENTS=$client \ LAMIGO_AGT_NODES="" \ + LAMIGO_CLIENTS=$client \ LAMIGO_AGE=$lamigo_age \ LAMIGO_MIRROR_CMD="lfs mirror extend -N -W1M >/dev/null" \ start_lamigo_service -- 1.8.3.1