/* periodical message from access_log reader */
#define KEEP_ALIVE_MSG "# keepalive"
+#define SSH_POLL_TIMEOUT_SEC (opt.o_alr_ofd_interval * 8)
+
struct alr_agent {
char *ala_host;
struct lipe_list_head ala_list;
pthread_t ala_pid;
time_t ala_last_msg_received;
time_t ala_last_alr_restarted;
+ int keepalive_interval;
};
static LIPE_LIST_HEAD(alr_agent_list);
if (strncmp(line, KEEP_ALIVE_MSG, sizeof(KEEP_ALIVE_MSG)) == 0) {
/* Found "# keepalive" */
restart_log_reader_on_timeout = true;
+ LX_DEBUG("keepalive msg from host:'%s'\n", host);
return;
}
/* extract full lines from the buffer and try to parse */
do {
-
p = memchr(buf + offset, '\n', size);
if (!p) {
/* can't find full line */
return 0;
}
+/*
+ * Return:
+ * <= 0 - SSH error or SSH timeout
+ * == 1 - feature has been set (could be off or on)
+ */
+static int init_features(struct alr_agent *ala)
+{
+ char cmd[PATH_MAX];
+ ssh_channel channel = NULL;
+ int i;
+ char buffer[8 * 1024];
+ int rc, strtst;
+
+ ala->keepalive_interval = 0;
+
+ snprintf(cmd, sizeof(cmd), "%s --help", opt.o_alr_cmd);
+
+ rc = lipe_ssh_start_cmd_timeout(&ala->ala_ctx, cmd, &channel, 5);
+ if (rc != SSH_OK) {
+ LX_ERROR("cannot start '%s' on host '%s': rc = %d\n",
+ cmd, ala->ala_host, rc);
+ return -1;
+ }
+
+ rc = ssh_channel_read_timeout(channel, buffer, sizeof(buffer)-1,
+ 0, 5 * 1000);
+ if (rc <= 0)
+ goto out;
+
+ buffer[rc] = 0;
+
+ strtst = strstr(buffer, "--keepalive") != NULL;
+ if (strtst) {
+ ala->keepalive_interval = SSH_POLL_TIMEOUT_SEC/2;
+ if (ala->keepalive_interval == 0)
+ ala->keepalive_interval = 1;
+ }
+ LX_DEBUG("keepalive option %d\n", strtst); /* Used in hot-pools.sh */
+out:
+ ssh_channel_send_eof(channel);
+ ssh_channel_close(channel);
+ ssh_channel_free(channel);
+
+ return rc;
+}
+
+static void lamigo_alr_init_features(struct alr_agent *ala)
+{
+ int i;
+
+ for (i = 0; i < 5; i++, sleep(PREVENT_SSH_BUSY_LOOP_SEC)) {
+ if (init_features(ala) > 0)
+ return;
+ }
+
+ /* All requested features failed */
+ LX_DEBUG("keepalive option -1\n"); /* Used in hot-pools.sh */
+}
/* Run ofd_access_log_reader once on the specified OSS.
*
time_t last_checked = time(NULL);
int rc, offset = 0, received = 0;
char cmd[PATH_MAX];
+ char keepalivestr[32] = "";
char buffer[16 * 1024];
/* We do not try to read stderr from the access log reader. So
* we must redirect it on the remote side to avoid
* unintentional blocking. See the comment in
- * lipe_ssh_exec(). */
+ * lipe_ssh_exec().
+ */
+ if (ala->keepalive_interval)
+ snprintf(keepalivestr, sizeof(keepalivestr), " --keepalive=%d",
+ ala->keepalive_interval);
snprintf(cmd, sizeof(cmd),
- "%s -i %d -I %d %s 2> /dev/null",
- opt.o_alr_cmd, opt.o_alr_ofd_interval,
+ "%s%s -i %d -I %d %s 2> /dev/null",
+ opt.o_alr_cmd, keepalivestr, opt.o_alr_ofd_interval,
mdtidx, opt.o_alr_extra_args);
ala->ala_last_alr_restarted = time(NULL);
while (ssh_channel_is_open(channel) && !ssh_channel_is_eof(channel)) {
/* Wait at max 4*ofd_access_log_reader batch interval */
- const int timeout_sec = opt.o_alr_ofd_interval * 8;
+ const int timeout_msec = SSH_POLL_TIMEOUT_SEC * 1000;
time_t now;
- rc = ssh_channel_poll_timeout(channel, timeout_sec * 1000, 0);
+ rc = ssh_channel_poll_timeout(channel, timeout_msec, 0);
if (rc == SSH_ERROR) {
LX_ERROR("polling ssh channel from '%s' failed\n",
ala->ala_host);
rc = SSH_OK;
if (restart_log_reader_on_timeout) {
LX_ERROR("waiting message from '%s' exceeded %dsec\n",
- ala->ala_host, timeout_sec);
+ ala->ala_host, timeout_msec/1000);
}
break;
}
if (channel != NULL) {
ssh_channel_send_eof(channel);
ssh_channel_close(channel);
+ ssh_channel_free(channel);
}
- ssh_channel_free(channel);
-
return rc;
} /* lamigo_alr_agent_run */
static void *lamigo_alr_data_collection_thread(void *arg)
{
struct alr_agent *ala = arg;
+ lamigo_alr_init_features(ala);
while (1) {
time_t since_last_restart;
}
run_test 73 "check strip size & count after replication"
+get_lamigo_keepalive() {
+ local facet=${LAMIGO_MDT_FACET[0]}
+ local log_file=$(lamigo_logfile $facet)
+ # 1679963639.802665 testfs-MDT0000: DEBUG: keepalive option 1
+ local awk_string="awk '/DEBUG: keepalive option/ { print \\\$6 }' ${log_file}"
+
+ local keepalive_opt=$(do_facet $facet $awk_string)
+
+ echo $keepalive_opt
+}
+
+verify_no_keepalive() {
+ local keepalive_opt=$1
+
+ (( keepalive_opt != 0 )) ||
+ echo "No OFD keepalive message option detected"
+
+ (( keepalive_opt != 1 )) ||
+ error "Not expected OFD keepalive message option"
+ return 0
+}
+
+verify_keepalive() {
+ local keepalive_opt=$1
+ local lamigo_facet=${LAMIGO_MDT_FACET[0]}
+ local lamigo_log_file=$(lamigo_logfile $lamigo_facet)
+ local count
+ local grep_string
+
+ (( keepalive_opt != 0 )) ||
+ error "No OFD keepalive message option detected"
+
+ (( keepalive_opt != 1 )) ||
+ echo "OFD keepalive message option detected"
+
+ # 1680134703.390641 testfs-MDT0000: DEBUG: keepalive msg from host:'ost-centOS8'
+
+ grep_string="grep -c 'DEBUG: keepalive msg from host' $lamigo_log_file"
+ count=$(do_facet $lamigo_facet $grep_string)
+
+ (( count >= 2 && count <= 4 )) ||
+ error "keepalive msg received $count. Expected 2..4"
+}
+
+test_74() {
+ local oss
+ local keepalive_opt
+
+ # Firstly determine whether ofd_access_log_reader has keepalve message option
+ local ofd_keepalive=$(do_facet ost1 ofd_access_log_reader --help |
+ grep keepalive)
+ init_hot_pools_env
+
+ for oss in $(osts_nodes); do
+ break
+ done
+
+ do_nodes $(comma_list $(osts_nodes)) \
+ "$LCTL set_param obdfilter.${FSNAME}-OST*.access_log_size=4096 >&/dev/null"
+
+ # Secondly determine whether lamigo detects keepalve message option
+
+ LAMIGO_EXTRA_OPT="--oss=$oss --ofd-interval=3" start_lamigo_cmd
+ check_lamigo_is_started || error "failed to start lamigo"
+ stack_trap stop_lamigo_cmd
+
+ sleep 30
+
+ keepalive_opt=$(get_lamigo_keepalive)
+
+ [[ -n $keepalive_opt ]] ||
+ error "could not determine lamigo keepalive option: not found"
+
+ (( $keepalive_opt < 0 )) &&
+ error "could not determine lamigo keepalive option: ssh error"
+
+ if [[ -z $ofd_keepalive ]]; then
+ verify_no_keepalive $keepalive_opt
+ else
+ verify_keepalive $keepalive_opt
+ fi
+}
+run_test 74 "ofd keepalive message"
+
complete $SECONDS
check_and_cleanup_lustre
exit_status
}
run_test 165f "ofd_access_log_reader --exit-on-close works"
+test_165g() {
+ local count
+ local trace=$TMP/${tfile}.trace
+ local ofd_keepalive=$(do_facet ost1 ofd_access_log_reader --help |
+ grep keepalive)
+
+ [[ -n "$ofd_keepalive" ]] || skip "ALR keepalive message unsupported"
+
+ setup_165
+ do_facet ost1 timeout 60 ofd_access_log_reader \
+ --keepalive=3 --batch-interval=4 --debug=- > "${trace}" &
+ sleep 40
+ stop ost1
+
+ do_facet ost1 killall -TERM ofd_access_log_reader
+ wait
+ rc=$?
+
+ if ((rc != 0)); then
+ error "ofd_access_log_reader exited with rc = '${rc}'"
+ fi
+
+ count=$(grep -c "send keepalive" ${trace})
+
+ # Duration 40 seconds / 4 secs interval = 10 times or so
+ (( count >= 9 && count <= 11 )) ||
+ error "keepalive msg received $count. Expected 9..11"
+}
+run_test 165g "ofd_access_log_reader --keepalive works"
+
test_169() {
# do directio so as not to populate the page cache
log "creating a 10 Mb file"
#include "lstddef.h"
#include "ofd_access_batch.h"
+static time_t when_last_printed;
+
/* XXX Weird param order to be consistent with list_replace_init(). */
static inline void list_replace_init(struct list_head *old_node,
struct list_head *new_node)
(unsigned long long)alre->alre_segment_count[d],
(unsigned long long)alre->alre_count[d],
(d == ALR_READ) ? 'r' : 'w');
+ when_last_printed = time(NULL);
}
struct alr_thread_arg {
pthread_mutex_t *file_mutex;
};
+static void alre_print_keepalive(struct alr_thread_arg *aa)
+{
+ int rc = pthread_mutex_lock(aa->file_mutex);
+
+ if (rc != 0)
+ FATAL("cannot lock batch file: %s\n",strerror(rc));
+
+ fprintf(aa->file, "# keepalive\n");
+ when_last_printed = time(NULL);
+ DEBUG("send keepalive\n");
+
+ rc = pthread_mutex_unlock(aa->file_mutex);
+ if (rc != 0)
+ FATAL("cannot unlock batch file: %s\n",strerror(rc));
+}
+
void *alr_sort_and_print_thread(void *arg)
{
struct alr_entry *alre, *next;
goto out;
sa = calloc(nr, sizeof(*sa));
- if (!sa) {
- fprintf(stderr, "cannot allocate memory for sorting\n");
- exit(1);
- }
+ if (!sa)
+ FATAL("cannot allocate memory for sorting\n");
i = 0;
list_for_each_entry(alre, tmp, alre_fid_hash_node.fhn_node) {
/* Prevent jumbled output from multiple concurrent sort and
* print threads. */
rc = pthread_mutex_lock(aa->file_mutex);
- if (rc != 0) {
- fprintf(stderr, "cannot lock batch file: %s\n",
- strerror(rc));
- exit(1);
- }
+ if (rc != 0)
+ FATAL("cannot lock batch file: %s\n",strerror(rc));
/* there might be lots of items at @cut, but we want to limit total
* output. so the first loop dumps all items > @cut and the second
}
rc = pthread_mutex_unlock(aa->file_mutex);
- if (rc != 0) {
- fprintf(stderr, "cannot unlock batch file: %s\n",
- strerror(rc));
- exit(1);
- }
+ if (rc != 0)
+ FATAL("cannot unlock batch file: %s\n", strerror(rc));
out:
+ /* If nothing printed - send keepalive */
+ if (keepalive_interval) {
+ if (time(NULL) - when_last_printed > keepalive_interval)
+ alre_print_keepalive(aa);
+ }
+
fflush(aa->file);
list_for_each_entry_safe(alre, next, tmp, alre_fid_hash_node.fhn_node) {
#include <pthread.h>
#include <sys/types.h>
#include <linux/types.h>
+#include <errno.h>
+
+extern FILE *debug_file;
+extern FILE *trace_file;
+
+#define DEBUG(fmt, args...) \
+ do { \
+ if (debug_file != NULL) \
+ fprintf(debug_file, "DEBUG %s:%d: "fmt, __func__, __LINE__, ##args); \
+ } while (0)
+
+#define TRACE(fmt, args...) \
+ do { \
+ if (trace_file != NULL) \
+ fprintf(trace_file, "TRACE "fmt, ##args); \
+ } while (0)
+
+#define DEBUG_D(x) DEBUG("%s = %"PRIdMAX"\n", #x, (intmax_t)x)
+#define DEBUG_P(x) DEBUG("%s = %p\n", #x, x)
+#define DEBUG_S(x) DEBUG("%s = '%s'\n", #x, x)
+#define DEBUG_U(x) DEBUG("%s = %"PRIuMAX"\n", #x, (uintmax_t)x)
+
+#define ERROR(fmt, args...) \
+ fprintf(stderr, "%s: "fmt, program_invocation_short_name, ##args)
+
+#define FATAL(fmt, args...) \
+ do { \
+ ERROR("FATAL: "fmt, ##args); \
+ exit(EXIT_FAILURE); \
+ } while (0)
+
+
struct lu_fid;
struct alr_batch;
-
+extern unsigned long keepalive_interval;
struct alr_batch *alr_batch_create(unsigned int shift);
void alr_batch_destroy(struct alr_batch *alrb);
int alr_batch_add(struct alr_batch *alrb, const char *obd_name,
/* TODO fsname filter */
-static FILE *debug_file;
-static FILE *trace_file;
-
-#define DEBUG(fmt, args...) \
- do { \
- if (debug_file != NULL) \
- fprintf(debug_file, "DEBUG %s:%d: "fmt, __func__, __LINE__, ##args); \
- } while (0)
-
-#define TRACE(fmt, args...) \
- do { \
- if (trace_file != NULL) \
- fprintf(trace_file, "TRACE "fmt, ##args); \
- } while (0)
-
-#define DEBUG_D(x) DEBUG("%s = %"PRIdMAX"\n", #x, (intmax_t)x)
-#define DEBUG_P(x) DEBUG("%s = %p\n", #x, x)
-#define DEBUG_S(x) DEBUG("%s = '%s'\n", #x, x)
-#define DEBUG_U(x) DEBUG("%s = %"PRIuMAX"\n", #x, (uintmax_t)x)
-
-#define ERROR(fmt, args...) \
- fprintf(stderr, "%s: "fmt, program_invocation_short_name, ##args)
-
-#define FATAL(fmt, args...) \
- do { \
- ERROR("FATAL: "fmt, ##args); \
- exit(EXIT_FAILURE); \
- } while (0)
+FILE *debug_file;
+FILE *trace_file;
enum {
ALR_EXIT_SUCCESS = INT_MIN + EXIT_SUCCESS,
#define P_ALR_LOG(al) \
P_ALR_DEV(&(al)->alr_dev), major((al)->alr_rdev), minor((al)->alr_rdev)
+unsigned long keepalive_interval;
+
static void alr_dev_free(int epoll_fd, struct alr_dev *ad)
{
if (ad == NULL)
" -e, --exit-on-close exit on close of all log devices\n"
" -I, --mdt-index-filter=INDEX set log MDT index filter to INDEX\n"
" -h, --help display this help and exit\n"
+" --keepalive=INTERVAL print keepalive message every INTERVAL seconds\n"
" -l, --list print YAML list of available access logs\n"
" -d, --debug[=FILE] print debug messages to FILE (stderr)\n"
" -s, --stats=FILE print stats messages to FILE (stderr)\n"
{ .name = "batch-interval", .has_arg = required_argument, .val = 'i', },
{ .name = "batch-offset", .has_arg = required_argument, .val = 'o', },
{ .name = "exit-on-close", .has_arg = no_argument, .val = 'e', },
+ { .name = "keepalive", .has_arg = required_argument, .val = 'k' },
{ .name = "mdt-index-filter", .has_arg = required_argument, .val = 'I' },
{ .name = "debug", .has_arg = optional_argument, .val = 'd', },
{ .name = "help", .has_arg = no_argument, .val = 'h', },
errno != 0)
FATAL("invalid batch interval '%s'\n", optarg);
break;
+ case 'k':
+ errno = 0;
+ keepalive_interval = strtoll(optarg, NULL, 0);
+ if (keepalive_interval > 1048576 || errno != 0)
+ FATAL("invalid keepalive message interval '%s'\n", optarg);
+ break;
case 'o':
errno = 0;
batch_offset = strtoll(optarg, NULL, 0);
trace_file = stdout;
} else {
trace_file = fopen(optarg, "a");
- if (debug_file == NULL)
+ if (trace_file == NULL)
FATAL("cannot open debug file '%s': %s\n",
optarg, strerror(errno));
}
exit(EXIT_FAILURE);
}
}
-
if (batch_interval > 0) {
alr_batch = alr_batch_create(-1);
if (alr_batch == NULL)