#define DEF_FAST_POOL_MAX_USED 30 /* open for migration if % space used is less than */
#define DEF_SLOW_POOL_MAX_USED 90 /* open for migration if % space used is less than */
#define DEF_SSH_EXEC_TO_SECS (10 * 60) /* ssh exec timeout */
-#define DEF_MIRROR_STATS_SEC 3 /* mirror extend/resync stats interval */
+#define DEF_MIRROR_STATS_SEC 6 /* mirror extend/resync stats interval */
+#define DEF_STATS_MISSED 50 /* stats missed before timeout */
#define LAMIGO_USERFILE "/var/lib/lamigo-%s.chlg"
#define LAMIGO_DUMPFILE "/var/run/lamigo-%s.stats"
.o_alr_cooldown_k = DEF_COOLDOWN_K,
.o_alr_ofd_interval = 0,
.o_mirror_stats_sec = DEF_MIRROR_STATS_SEC,
+ .o_stats_missed = DEF_STATS_MISSED,
.o_alr_hot_fraction = DEF_HOT_FRACTION,
.o_alr_hot_after_idle = DEF_HOT_AFTER_IDLE,
.o_include_dom = 0,
return 0;
buffer = xstrdup(orig);
-
if (!yaml_parser_initialize(&parser)) {
LX_ERROR("can't initialize yaml parser\n");
rj->rj_dont_parse = 1;
pthread_mutex_unlock(&a->rag_ssh_lock);
}
-static int lamigo_exec_cmd(struct resync_job *rj, const char *cmd,
- int *pstatus, char *outbuf, int outbuf_size,
- char *errbuf, int errbuf_size)
+static int lamigo_exec_cmd(struct resync_job *rj, const char *cmd, int *pstatus,
+ char *stderr_buf, int stderr_size)
{
- struct resync_agent *a = rj->rj_agent ;
+ struct resync_agent *a = rj->rj_agent;
struct resync_ssh_session *rss;
- int rc, timeout;
+ int rc, timeout_msec;
char cmd1[PATH_MAX * 2];
ssh_channel channel = NULL;
char buf[1024];
- int nr, last_read;
+ int nr;
rss = lamigo_get_session(a);
goto out;
}
- timeout = opt.o_ssh_exec_to;
- if (a->rag_has_stats)
- timeout = opt.o_mirror_stats_sec * 2;
-
rc = lipe_ssh_start_cmd(&rss->rss_ctx, cmd1, &channel,
LIPE_SSH_TIMEOUT_INFINITE);
- if (rc != SSH_OK)
+ if (rc != SSH_OK) {
+ LX_ERROR("Starting '%s' on '%s' failed\n",
+ cmd, a->rag_hostname);
goto out;
+ }
- assert(timeout > 0);
- last_read = time(NULL);
-
- while (rc >= 0) {
- rc = ssh_channel_read_timeout(channel, buf,
- sizeof(buf) - 1, 0,
- timeout * 1000);
- if (rc > 0) {
- last_read = time(NULL);
- buf[rc] = '\0';
- lamigo_mirror_stats_parse(rj, buf, rc);
- continue;
+ timeout_msec = opt.o_ssh_exec_to * 1000;
+ if (a->rag_has_stats)
+ timeout_msec = opt.o_mirror_stats_sec * opt.o_stats_missed * 1000;
+
+ assert(timeout_msec > 0);
+
+ while (ssh_channel_is_open(channel) && !ssh_channel_is_eof(channel)) {
+ rc = ssh_channel_poll_timeout(channel, timeout_msec, 0);
+ if (rc == SSH_ERROR) {
+ LX_ERROR("polling ssh channel failed for '%s' on '%s'\n",
+ cmd, a->rag_hostname);
+ goto out_ssh;
}
- if (ssh_channel_is_eof(channel)) {
- /* end of execution */
+ if (rc == SSH_EOF) {
+ rc = SSH_OK;
break;
}
- if (rc == 0 && (time(NULL) - last_read) >= timeout) {
- LX_ERROR("error: no progress updates for %s\n", cmd1);
+ if (rc == SSH_AGAIN || /* Bug fixed since libssh 0.9.5 */
+ rc == SSH_OK) { /* Timeout */
+ LX_ERROR("no progress updates for '%s' on '%s'\n",
+ cmd, a->rag_hostname);
rc = -ETIMEDOUT;
goto out_ssh;
}
- }
+
+ rc = ssh_channel_read_nonblocking(channel, buf, sizeof(buf)-1, 0);
+
+ if (rc == SSH_ERROR) {
+ LX_ERROR("reading ssh failed for '%s' on '%s'\n",
+ cmd, a->rag_hostname);
+ goto out_ssh;
+ }
+ /* Nothing is available or timeout */
+ if (rc == SSH_AGAIN || rc == SSH_OK) {
+ if (ssh_channel_is_eof(channel)) {
+ rc = SSH_OK;
+ break;
+ }
+ continue;
+ }
+
+ buf[rc] = '\0';
+ /* LX_DEBUG("ssh read rc=%d '%s'\n", rc, buf); */
+ lamigo_mirror_stats_parse(rj, buf, rc);
+ } /* while */
rc = ssh_channel_get_exit_status(channel);
if (rc < 0) {
- LX_ERROR("lipe_ssh_session_start_cmd failed: %d: %s\n",
- rc, ssh_get_error(rss->rss_ctx.lsc_session));
- rc = SSH_ERROR;
+ LX_ERROR("ssh_channel_get_exit_status failed for '%s' on '%s' (%s)\n",
+ cmd, a->rag_hostname,
+ ssh_get_error(rss->rss_ctx.lsc_session));
goto out_ssh;
}
*pstatus = rc;
rc = SSH_OK;
+ if (stderr_buf) {
+ nr = ssh_channel_read_nonblocking(channel, stderr_buf,
+ stderr_size - 1, true);
+ if (nr > 0)
+ stderr_buf[nr] = '\0';
+ }
+
out_ssh:
ssh_channel_send_eof(channel);
ssh_channel_close(channel);
ssh_channel_free(channel);
out:
- if (rc)
- LX_ERROR("error executing ssh command '%s' on '%s'",
- cmd, a->rag_hostname);
-
lamigo_put_session(a, rss);
-
return rc;
}
agent->rag_extra_options[0] = '\0';
if (rc == 0) {
- if (strstr(errbuf, "stats-interval") != NULL) {
+ if ((strstr(errbuf, "stats-interval") != NULL) ||
+ (strstr(outbuf, "stats-interval") != NULL)) {
rc = snprintf(agent->rag_extra_options,
sizeof(agent->rag_extra_options) - 1,
"--stats --stats-interval=%d",
opt.o_mirror_stats_sec);
agent->rag_has_stats = true;
}
- if (strstr(errbuf, "bandwidth") != NULL)
+ if ((strstr(errbuf, "bandwidth") != NULL) ||
+ (strstr(outbuf, "bandwidth") != NULL))
agent->rag_has_bwlimit = true;
agent->rag_opts_checked = true;
}
{
struct resync_job *rj = (struct resync_job *)args;
struct resync_agent *agent = rj->rj_agent;
- char outbuf[4096], errbuf[4096];
+ char errbuf[4096];
int resync = rj->rj_resync;
char cmd[PATH_MAX * 2];
int status = INT_MAX;
goto out;
}
+ errbuf[0] = '\0';
/* rc < 0 means an ssh error. Otherwise command exit status is
* in status. Mask common exit statuses. */
- rc = lamigo_exec_cmd(rj, cmd, &status, outbuf, sizeof(outbuf),
- errbuf, sizeof(errbuf));
+ rc = lamigo_exec_cmd(rj, cmd, &status, errbuf, sizeof(errbuf));
LX_DEBUG("exec command '%s' on '%s': rc = %d, status = %d\n",
cmd, agent->rag_hostname, rc, status);
if (rc < 0 ||
(!rj->rj_setprefer && status != 0 && status != EBUSY && status != ENODATA)) {
LX_ERROR("command '%s' on '%s' failed: rc = %d, status = %d\n",
cmd, agent->rag_hostname, rc, status);
- if (outbuf[0] != '\0')
- LX_ERROR("STDOUT: %s\n", outbuf);
- if (errbuf[0] != '\0')
- LX_ERROR("STDERR: %s\n", errbuf);
+ if (rc >= 0)
+ LX_ERROR("STDERR: '%s'\n", errbuf);
}
out:
/* notify the main thread about completion */
* ssh was doing OK and the changelog got new
* OPEN/CLOSE events which will cause another try
*/
- if (retval == -ETIMEDOUT)
+ if (retval == -ETIMEDOUT) {
stats.s_job_timedout++;
- if (retval == SSH_ERROR) {
+ } else if (retval == SSH_ERROR) {
lipe_list_add_tail(&rj->rj_list, &lamigo_failed_job_list);
lamigo_jobs_delayed++;
rj = NULL;
LAMIGO_OPT_TGT_FREE, /* == LAMIGO_OPT_SLOW_POOL_MAX_USED + math + warning */
LAMIGO_OPT_LOG_TIMESTAMPS,
LAMIGO_OPT_VERSION,
+ LAMIGO_OPT_STATS_MISSED,
};
static struct option options[] = {
{ "ssh-exec-timeout", required_argument, NULL, LAMIGO_OPT_SSH_EXEC_TO },
{ "ssh-log-verbosity", required_argument, NULL, LAMIGO_OPT_SSH_LOG_VERBOSITY },
{ "statfs-refresh", required_argument, NULL, LAMIGO_OPT_STATFS_REFRESH },
+ { "stats-missed", required_argument, NULL, LAMIGO_OPT_STATS_MISSED },
{ "tgt", required_argument, NULL, 't'},
{ "tgt-free", required_argument, NULL, LAMIGO_OPT_TGT_FREE},
{ "thread-number", required_argument, NULL, 'n' },
case LAMIGO_OPT_VERSION:
lipe_version();
exit(EXIT_SUCCESS);
+ case LAMIGO_OPT_STATS_MISSED:
+ opt.o_stats_missed = atoi(optarg);
+ if (opt.o_stats_missed < 1 || opt.o_stats_missed > 1000)
+ LX_FATAL("invalid stats-missed '%s'\n", optarg);
+ break;
case 'a':
opt.o_min_age = strtol(optarg, &endptr, 10);
if (*endptr != '\0' || opt.o_min_age < 5)
LAMIGO_EXTRA=${LAMIGO_EXTRA:-""}
LAMIGO_CLIENTS=${CLIENTS:-"$HOSTNAME"}
LAMIGO_CLIENTS=${LAMIGO_CLIENTS//,/ }
+LAMIGO_STATS_MISSING=${LAMIGO_STATS_MISSING:-""}
declare -a LAMIGO_MDT
declare -a LAMIGO_MDT_FACET
cmd+=${LAMIGO_PROG_INTV:+" --progress-interval=$LAMIGO_PROG_INTV"}
cmd+=${LAMIGO_MIRROR_CMD:+" --mirror-cmd=\"$LAMIGO_MIRROR_CMD\""}
cmd+=${LAMIGO_RESYNC_CMD:+" --resync-cmd=\"$LAMIGO_RESYNC_CMD\""}
+ cmd+=${LAMIGO_STATS_MISSING:+" --stats-missed=\"$LAMIGO_STATS_MISSING\""}
cmd+=${LAMIGO_DUMP:+" -w ${LAMIGO_DUMP}.$mdt"}
[[ -z \\\"$LAMIGO_EXTRA\\\" ]] ||
echo \\\"$LAMIGO_EXTRA\\\" >> $cfg_file;
[[ -z \\\"$LAMIGO_MIRROR_CMD\\\" ]] ||
- echo mirror-cmd=$LAMIGO_MIRROR_CMD >> $cfg_file;
+ echo mirror-cmd=\\\"$LAMIGO_MIRROR_CMD\\\" >> $cfg_file;
[[ -z \\\"$LAMIGO_RESYNC_CMD\\\" ]] ||
- echo resync-cmd=$LAMIGO_RESYNC_CMD >> $cfg_file;
+ echo resync-cmd=\\\"$LAMIGO_RESYNC_CMD\\\" >> $cfg_file;
[[ -z \\\"$LAMIGO_DUMP\\\" ]] ||
echo dump=\\\"${LAMIGO_DUMP}.$mdt\\\" >> $cfg_file;
+ [[ -z \\\"$LAMIGO_STATS_MISSING\\\" ]] ||
+ echo stats-missed=\\\"$LAMIGO_STATS_MISSING\\\" >> $cfg_file;
! $LAMIGO_DEBUG || echo debug >> $cfg_file;
! $LAMIGO_RESCAN || echo rescan >> $cfg_file;
! $LAMIGO_TIMESTAMPS || echo timestamps >> $cfg_file;
run_test 74 "ofd keepalive message"
test_75a() {
+ # The test is not valid if stats-interval is not supported
+ # The test is going to use only client machine
+ if lfs mirror extend | grep -q stats-interval ; then
+ skip "requires 'lfs mirror extend --stats-interval' support"
+ fi
+
local tf=$DIR/$tfile
init_hot_pools_env
+ LAMIGO_AGT_NODES="" \
LAMIGO_MIRROR_CMD="lfs mirror extend -N -W1M" \
start_lamigo_cfg
check_lamigo_is_started || {
run_test 75a "lamigo to parse mirroring progress"
test_75b() {
+ # The test is not valid if stats-interval is not supported.
+ # The test is going to use only client machine.
+ if lfs mirror extend | grep -q stats-interval ; then
+ skip "requires 'lfs mirror extend --stats-interval' support"
+ fi
+
local tf=$DIR/$tfile
init_hot_pools_env
+ LAMIGO_AGT_NODES="" \
LAMIGO_RESYNC_CMD="lfs mirror resync -W1M" start_lamigo_cfg
check_lamigo_is_started || {
local facet=${LAMIGO_MDT_FACET[0]}
run_test 75b "lamigo to parse mirror resync progress"
test_75c() {
+ # The test not valid if stats-interval is not supported
+ # The test is going to use only client machine.
+ if lfs mirror extend | grep -q stats-interval ; then
+ skip "requires 'lfs mirror extend --stats-interval' support"
+ fi
+
local tf=$DIR/$tfile
+ local lamigo_age=5
init_hot_pools_env
# redirect stats to /dev/null
- LAMIGO_AGE=5 LAMIGO_MIRROR_CMD="lfs mirror extend -N -W1M \>/dev/null" \
+ # Use only client machine as a replicating agent
+ LAMIGO_STATS_MISSING=1 \
+ LAMIGO_AGT_NODES="" \
+ LAMIGO_AGE=$lamigo_age \
+ LAMIGO_MIRROR_CMD="lfs mirror extend -N -W1M >/dev/null" \
start_lamigo_service
check_lamigo_is_started || error "failed to start lamigo"
stack_trap stop_lamigo_service
local nr
nr=$(dump_one_lamigo_stats | awk '/job-timedout:/{print $2}')
(( $nr == 0 )) || error "unexptected job-timedout: $nr"
- for ((i=0; i < (($LAMIGO_AGE*2)); i++)); do
+
+ for ((i=0; i < (($lamigo_age*6)); i++)); do
nr=$(dump_one_lamigo_stats | awk '/job-timedout:/{print $2}')
- (( $nr > 0 )) && break
dump_one_lamigo_stats | grep job-timedout
+ (( $nr > 0 )) && break
sleep 2
done
(( $nr > 0 )) || error "no timedout jobs"