Whamcloud - gitweb
EX-4141 lipe: keepalive message from ofd_access_log_reader
authorAlexandre Ioffe <aioffe@ddn.com>
Thu, 23 Mar 2023 06:39:12 +0000 (23:39 -0700)
committerAndreas Dilger <adilger@whamcloud.com>
Tue, 25 Apr 2023 03:58:04 +0000 (03:58 +0000)
Lamigo checks ofd_access_log_reader for keepalive message
feature availability. Start ofd_access_log_reader with keepalive
message option if it is available.
Added tests for lamigo and ofd_access_log_reader to
test keepalive message feature
Fixed a bug when trace file is opened.
Moved FATAL, ERROR, DEBUG macros to include and
Used FATAL macro wherever possible

Signed-off-by: Alexandre Ioffe <aioffe@ddn.com>
Test-Parameters: trivial testlist=hot-pools,sanity
Test-Parameters: trivial testlist=hot-pools,sanity
Test-Parameters: trivial testlist=hot-pools,sanity
Change-Id: I0f218e2394c9d0ab6cd425860ba79956a10cbd58
Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/50389
Reviewed-by: Jian Yu <yujian@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
lipe/src/lamigo_alr.c
lustre/tests/hot-pools.sh
lustre/tests/sanity.sh
lustre/utils/ofd_access_batch.c
lustre/utils/ofd_access_batch.h
lustre/utils/ofd_access_log_reader.c

index 0bd9525..a37eae4 100644 (file)
@@ -42,6 +42,8 @@
 /* periodical message from access_log reader */
 #define KEEP_ALIVE_MSG "# keepalive"
 
+#define SSH_POLL_TIMEOUT_SEC (opt.o_alr_ofd_interval * 8)
+
 struct alr_agent {
        char *ala_host;
        struct lipe_list_head ala_list;
@@ -49,6 +51,7 @@ struct alr_agent {
        pthread_t ala_pid;
        time_t ala_last_msg_received;
        time_t ala_last_alr_restarted;
+       int keepalive_interval;
 };
 
 static LIPE_LIST_HEAD(alr_agent_list);
@@ -186,6 +189,7 @@ static void lamigo_alr_parse_one(const char *host, const char *line)
        if (strncmp(line, KEEP_ALIVE_MSG, sizeof(KEEP_ALIVE_MSG)) == 0) {
                /* Found "# keepalive" */
                restart_log_reader_on_timeout = true;
+               LX_DEBUG("keepalive msg from host:'%s'\n", host);
                return;
        }
 
@@ -235,7 +239,6 @@ static int lamigo_alr_parse(const char *host, char *buf, int size, int *received
 
        /* extract full lines from the buffer and try to parse */
        do {
-
                p = memchr(buf + offset, '\n', size);
                if (!p) {
                        /* can't find full line */
@@ -253,6 +256,64 @@ static int lamigo_alr_parse(const char *host, char *buf, int size, int *received
 
        return 0;
 }
+/*
+ *     Return:
+ *             <= 0 - SSH error or SSH timeout
+ *             == 1 - feature has been set (could be off or on)
+ */
+static int init_features(struct alr_agent *ala)
+{
+       char cmd[PATH_MAX];
+       ssh_channel channel = NULL;
+       int i;
+       char buffer[8 * 1024];
+       int rc, strtst;
+
+       ala->keepalive_interval = 0;
+
+       snprintf(cmd, sizeof(cmd), "%s --help", opt.o_alr_cmd);
+
+       rc = lipe_ssh_start_cmd_timeout(&ala->ala_ctx, cmd, &channel, 5);
+       if (rc != SSH_OK) {
+               LX_ERROR("cannot start '%s' on host '%s': rc = %d\n",
+                        cmd, ala->ala_host, rc);
+               return -1;
+       }
+
+       rc = ssh_channel_read_timeout(channel, buffer, sizeof(buffer)-1,
+                                     0, 5 * 1000);
+       if (rc <= 0)
+               goto out;
+
+       buffer[rc] = 0;
+
+       strtst = strstr(buffer, "--keepalive") != NULL;
+       if (strtst) {
+               ala->keepalive_interval = SSH_POLL_TIMEOUT_SEC/2;
+               if (ala->keepalive_interval == 0)
+                       ala->keepalive_interval = 1;
+       }
+       LX_DEBUG("keepalive option %d\n", strtst); /* Used in hot-pools.sh */
+out:
+       ssh_channel_send_eof(channel);
+       ssh_channel_close(channel);
+       ssh_channel_free(channel);
+
+       return rc;
+}
+
+static void lamigo_alr_init_features(struct alr_agent *ala)
+{
+       int i;
+
+       for (i = 0; i < 5; i++, sleep(PREVENT_SSH_BUSY_LOOP_SEC)) {
+               if (init_features(ala) > 0)
+                       return;
+       }
+
+       /* All requested features failed */
+       LX_DEBUG("keepalive option -1\n"); /* Used in hot-pools.sh */
+}
 
 /* Run ofd_access_log_reader once on the specified OSS.
  *
@@ -271,16 +332,21 @@ static int lamigo_alr_agent_run(struct alr_agent *ala)
        time_t last_checked = time(NULL);
        int rc, offset = 0, received = 0;
        char cmd[PATH_MAX];
+       char keepalivestr[32] = "";
        char buffer[16 * 1024];
 
        /* We do not try to read stderr from the access log reader. So
         * we must redirect it on the remote side to avoid
         * unintentional blocking. See the comment in
-        * lipe_ssh_exec(). */
+        * lipe_ssh_exec().
+        */
+       if (ala->keepalive_interval)
+               snprintf(keepalivestr, sizeof(keepalivestr), " --keepalive=%d",
+                        ala->keepalive_interval);
 
        snprintf(cmd, sizeof(cmd),
-                "%s -i %d -I %d %s 2> /dev/null",
-                opt.o_alr_cmd, opt.o_alr_ofd_interval,
+                "%s%s -i %d -I %d %s 2> /dev/null",
+                opt.o_alr_cmd, keepalivestr, opt.o_alr_ofd_interval,
                 mdtidx, opt.o_alr_extra_args);
 
        ala->ala_last_alr_restarted = time(NULL);
@@ -296,10 +362,10 @@ static int lamigo_alr_agent_run(struct alr_agent *ala)
 
        while (ssh_channel_is_open(channel) && !ssh_channel_is_eof(channel)) {
                /* Wait at max 4*ofd_access_log_reader batch interval */
-               const int timeout_sec = opt.o_alr_ofd_interval * 8;
+               const int timeout_msec = SSH_POLL_TIMEOUT_SEC * 1000;
                time_t now;
 
-               rc = ssh_channel_poll_timeout(channel, timeout_sec * 1000, 0);
+               rc = ssh_channel_poll_timeout(channel, timeout_msec, 0);
                if (rc == SSH_ERROR) {
                        LX_ERROR("polling ssh channel from '%s' failed\n",
                                 ala->ala_host);
@@ -314,7 +380,7 @@ static int lamigo_alr_agent_run(struct alr_agent *ala)
                        rc = SSH_OK;
                        if (restart_log_reader_on_timeout) {
                                LX_ERROR("waiting message from '%s' exceeded %dsec\n",
-                                        ala->ala_host, timeout_sec);
+                                        ala->ala_host, timeout_msec/1000);
                        }
                        break;
                }
@@ -352,16 +418,16 @@ out:
        if (channel != NULL) {
                ssh_channel_send_eof(channel);
                ssh_channel_close(channel);
+               ssh_channel_free(channel);
        }
 
-       ssh_channel_free(channel);
-
        return rc;
 } /* lamigo_alr_agent_run */
 
 static void *lamigo_alr_data_collection_thread(void *arg)
 {
        struct alr_agent *ala = arg;
+       lamigo_alr_init_features(ala);
 
        while (1) {
                time_t since_last_restart;
index 4e4372b..58b9719 100755 (executable)
@@ -2254,6 +2254,90 @@ test_73() {
 }
 run_test 73 "check strip size & count after replication"
 
+get_lamigo_keepalive() {
+       local facet=${LAMIGO_MDT_FACET[0]}
+       local log_file=$(lamigo_logfile $facet)
+       # 1679963639.802665 testfs-MDT0000: DEBUG: keepalive option 1
+       local awk_string="awk '/DEBUG: keepalive option/ { print \\\$6 }' ${log_file}"
+
+       local keepalive_opt=$(do_facet $facet $awk_string)
+
+       echo $keepalive_opt
+}
+
+verify_no_keepalive() {
+       local keepalive_opt=$1
+
+       (( keepalive_opt != 0 )) ||
+               echo "No OFD keepalive message option detected"
+
+       (( keepalive_opt != 1 )) ||
+               error "Not expected OFD keepalive message option"
+       return 0
+}
+
+verify_keepalive() {
+       local keepalive_opt=$1
+       local lamigo_facet=${LAMIGO_MDT_FACET[0]}
+       local lamigo_log_file=$(lamigo_logfile $lamigo_facet)
+       local count
+       local grep_string
+
+       (( keepalive_opt != 0 )) ||
+               error "No OFD keepalive message option detected"
+
+       (( keepalive_opt != 1 )) ||
+               echo "OFD keepalive message option detected"
+
+       # 1680134703.390641 testfs-MDT0000: DEBUG: keepalive msg from host:'ost-centOS8'
+
+       grep_string="grep -c 'DEBUG: keepalive msg from host' $lamigo_log_file"
+       count=$(do_facet $lamigo_facet $grep_string)
+
+       (( count >= 2 && count <= 4 )) ||
+               error "keepalive msg received $count. Expected 2..4"
+}
+
+test_74() {
+       local oss
+       local keepalive_opt
+
+       # Firstly determine whether ofd_access_log_reader has keepalve message option
+       local ofd_keepalive=$(do_facet ost1 ofd_access_log_reader --help |
+                               grep keepalive)
+       init_hot_pools_env
+
+       for oss in $(osts_nodes); do
+               break
+       done
+
+       do_nodes $(comma_list $(osts_nodes)) \
+               "$LCTL set_param obdfilter.${FSNAME}-OST*.access_log_size=4096 >&/dev/null"
+
+       # Secondly determine whether lamigo detects keepalve message option
+
+       LAMIGO_EXTRA_OPT="--oss=$oss --ofd-interval=3" start_lamigo_cmd
+       check_lamigo_is_started || error "failed to start lamigo"
+       stack_trap stop_lamigo_cmd
+
+       sleep 30
+
+       keepalive_opt=$(get_lamigo_keepalive)
+
+       [[ -n $keepalive_opt ]] ||
+               error "could not determine lamigo keepalive option: not found"
+
+       (( $keepalive_opt < 0 )) &&
+               error "could not determine lamigo keepalive option: ssh error"
+
+       if [[ -z $ofd_keepalive ]]; then
+               verify_no_keepalive $keepalive_opt
+       else
+               verify_keepalive $keepalive_opt
+       fi
+}
+run_test 74 "ofd keepalive message"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index f17c6cb..412c499 100755 (executable)
@@ -17654,6 +17654,36 @@ test_165f() {
 }
 run_test 165f "ofd_access_log_reader --exit-on-close works"
 
+test_165g() {
+       local count
+       local trace=$TMP/${tfile}.trace
+       local ofd_keepalive=$(do_facet ost1 ofd_access_log_reader --help |
+                               grep keepalive)
+
+       [[ -n "$ofd_keepalive" ]] || skip "ALR keepalive message unsupported"
+
+       setup_165
+       do_facet ost1 timeout 60 ofd_access_log_reader \
+               --keepalive=3 --batch-interval=4 --debug=- > "${trace}" &
+       sleep 40
+       stop ost1
+
+       do_facet ost1 killall -TERM ofd_access_log_reader
+       wait
+       rc=$?
+
+       if ((rc != 0)); then
+               error "ofd_access_log_reader exited with rc = '${rc}'"
+       fi
+
+       count=$(grep -c "send keepalive" ${trace})
+
+       # Duration 40 seconds / 4 secs interval = 10 times or so
+       (( count >= 9 && count <= 11 )) ||
+               error "keepalive msg received $count. Expected 9..11"
+}
+run_test 165g "ofd_access_log_reader --keepalive works"
+
 test_169() {
        # do directio so as not to populate the page cache
        log "creating a 10 Mb file"
index 75cd43e..e655204 100644 (file)
@@ -44,6 +44,8 @@
 #include "lstddef.h"
 #include "ofd_access_batch.h"
 
+static time_t when_last_printed;
+
 /* XXX Weird param order to be consistent with list_replace_init(). */
 static inline void list_replace_init(struct list_head *old_node,
                                struct list_head *new_node)
@@ -344,6 +346,7 @@ static void alre_printf(FILE *f, struct alr_entry *alre, int d)
                (unsigned long long)alre->alre_segment_count[d],
                (unsigned long long)alre->alre_count[d],
                (d == ALR_READ) ? 'r' : 'w');
+       when_last_printed = time(NULL);
 }
 
 struct alr_thread_arg {
@@ -353,6 +356,22 @@ struct alr_thread_arg {
        pthread_mutex_t *file_mutex;
 };
 
+static void alre_print_keepalive(struct alr_thread_arg *aa)
+{
+       int rc = pthread_mutex_lock(aa->file_mutex);
+
+       if (rc != 0)
+               FATAL("cannot lock batch file: %s\n",strerror(rc));
+
+       fprintf(aa->file, "# keepalive\n");
+       when_last_printed = time(NULL);
+       DEBUG("send keepalive\n");
+
+       rc = pthread_mutex_unlock(aa->file_mutex);
+       if (rc != 0)
+               FATAL("cannot unlock batch file: %s\n",strerror(rc));
+}
+
 void *alr_sort_and_print_thread(void *arg)
 {
        struct alr_entry *alre, *next;
@@ -373,10 +392,8 @@ void *alr_sort_and_print_thread(void *arg)
                goto out;
 
        sa = calloc(nr, sizeof(*sa));
-       if (!sa) {
-               fprintf(stderr, "cannot allocate memory for sorting\n");
-               exit(1);
-       }
+       if (!sa)
+               FATAL("cannot allocate memory for sorting\n");
 
        i = 0;
        list_for_each_entry(alre, tmp, alre_fid_hash_node.fhn_node) {
@@ -396,11 +413,8 @@ void *alr_sort_and_print_thread(void *arg)
        /* Prevent jumbled output from multiple concurrent sort and
         * print threads. */
        rc = pthread_mutex_lock(aa->file_mutex);
-       if (rc != 0) {
-               fprintf(stderr, "cannot lock batch file: %s\n",
-                       strerror(rc));
-               exit(1);
-       }
+       if (rc != 0)
+               FATAL("cannot lock batch file: %s\n",strerror(rc));
 
        /* there might be lots of items at @cut, but we want to limit total
         * output. so the first loop dumps all items > @cut and the second
@@ -426,13 +440,16 @@ void *alr_sort_and_print_thread(void *arg)
        }
 
        rc = pthread_mutex_unlock(aa->file_mutex);
-       if (rc != 0) {
-               fprintf(stderr, "cannot unlock batch file: %s\n",
-                       strerror(rc));
-               exit(1);
-       }
+       if (rc != 0)
+               FATAL("cannot unlock batch file: %s\n", strerror(rc));
 
 out:
+       /* If nothing printed - send keepalive */
+       if (keepalive_interval) {
+               if (time(NULL) - when_last_printed > keepalive_interval)
+                       alre_print_keepalive(aa);
+       }
+
        fflush(aa->file);
 
        list_for_each_entry_safe(alre, next, tmp, alre_fid_hash_node.fhn_node) {
index 8c5c8e9..819912f 100644 (file)
@@ -3,10 +3,42 @@
 #include <pthread.h>
 #include <sys/types.h>
 #include <linux/types.h>
+#include <errno.h>
+
+extern FILE *debug_file;
+extern FILE *trace_file;
+
+#define DEBUG(fmt, args...)                                            \
+       do {                                                            \
+               if (debug_file != NULL)                                 \
+                       fprintf(debug_file, "DEBUG %s:%d: "fmt, __func__, __LINE__, ##args); \
+       } while (0)
+
+#define TRACE(fmt, args...)                                            \
+       do {                                                            \
+               if (trace_file != NULL)                                 \
+                       fprintf(trace_file, "TRACE "fmt, ##args);       \
+       } while (0)
+
+#define DEBUG_D(x) DEBUG("%s = %"PRIdMAX"\n", #x, (intmax_t)x)
+#define DEBUG_P(x) DEBUG("%s = %p\n", #x, x)
+#define DEBUG_S(x) DEBUG("%s = '%s'\n", #x, x)
+#define DEBUG_U(x) DEBUG("%s = %"PRIuMAX"\n", #x, (uintmax_t)x)
+
+#define ERROR(fmt, args...) \
+       fprintf(stderr, "%s: "fmt, program_invocation_short_name, ##args)
+
+#define FATAL(fmt, args...)                    \
+       do {                                    \
+               ERROR("FATAL: "fmt, ##args);    \
+               exit(EXIT_FAILURE);             \
+       } while (0)
+
+
 
 struct lu_fid;
 struct alr_batch;
-
+extern unsigned long keepalive_interval;
 struct alr_batch *alr_batch_create(unsigned int shift);
 void alr_batch_destroy(struct alr_batch *alrb);
 int alr_batch_add(struct alr_batch *alrb, const char *obd_name,
index cf72d9e..181ed41 100644 (file)
 
 /* TODO fsname filter */
 
-static FILE *debug_file;
-static FILE *trace_file;
-
-#define DEBUG(fmt, args...)                                            \
-       do {                                                            \
-               if (debug_file != NULL)                                 \
-                       fprintf(debug_file, "DEBUG %s:%d: "fmt, __func__, __LINE__, ##args); \
-       } while (0)
-
-#define TRACE(fmt, args...)                                            \
-       do {                                                            \
-               if (trace_file != NULL)                                 \
-                       fprintf(trace_file, "TRACE "fmt, ##args);       \
-       } while (0)
-
-#define DEBUG_D(x) DEBUG("%s = %"PRIdMAX"\n", #x, (intmax_t)x)
-#define DEBUG_P(x) DEBUG("%s = %p\n", #x, x)
-#define DEBUG_S(x) DEBUG("%s = '%s'\n", #x, x)
-#define DEBUG_U(x) DEBUG("%s = %"PRIuMAX"\n", #x, (uintmax_t)x)
-
-#define ERROR(fmt, args...) \
-       fprintf(stderr, "%s: "fmt, program_invocation_short_name, ##args)
-
-#define FATAL(fmt, args...)                    \
-       do {                                    \
-               ERROR("FATAL: "fmt, ##args);    \
-               exit(EXIT_FAILURE);             \
-       } while (0)
+FILE *debug_file;
+FILE *trace_file;
 
 enum {
        ALR_EXIT_SUCCESS = INT_MIN + EXIT_SUCCESS,
@@ -144,6 +118,8 @@ static int alr_print_fraction = 100;
 #define P_ALR_LOG(al) \
        P_ALR_DEV(&(al)->alr_dev), major((al)->alr_rdev), minor((al)->alr_rdev)
 
+unsigned long keepalive_interval;
+
 static void alr_dev_free(int epoll_fd, struct alr_dev *ad)
 {
        if (ad == NULL)
@@ -694,6 +670,7 @@ void usage(void)
 "  -e, --exit-on-close            exit on close of all log devices\n"
 "  -I, --mdt-index-filter=INDEX   set log MDT index filter to INDEX\n"
 "  -h, --help                     display this help and exit\n"
+"  --keepalive=INTERVAL           print keepalive message every INTERVAL seconds\n"
 "  -l, --list                     print YAML list of available access logs\n"
 "  -d, --debug[=FILE]             print debug messages to FILE (stderr)\n"
 "  -s, --stats=FILE              print stats messages to FILE (stderr)\n"
@@ -724,6 +701,7 @@ int main(int argc, char *argv[])
                { .name = "batch-interval", .has_arg = required_argument, .val = 'i', },
                { .name = "batch-offset", .has_arg = required_argument, .val = 'o', },
                { .name = "exit-on-close", .has_arg = no_argument, .val = 'e', },
+               { .name = "keepalive", .has_arg = required_argument, .val = 'k' },
                { .name = "mdt-index-filter", .has_arg = required_argument, .val = 'I' },
                { .name = "debug", .has_arg = optional_argument, .val = 'd', },
                { .name = "help", .has_arg = no_argument, .val = 'h', },
@@ -748,6 +726,12 @@ int main(int argc, char *argv[])
                            errno != 0)
                                FATAL("invalid batch interval '%s'\n", optarg);
                        break;
+               case 'k':
+                       errno = 0;
+                       keepalive_interval = strtoll(optarg, NULL, 0);
+                       if (keepalive_interval > 1048576 || errno != 0)
+                               FATAL("invalid keepalive message interval '%s'\n", optarg);
+                       break;
                case 'o':
                        errno = 0;
                        batch_offset = strtoll(optarg, NULL, 0);
@@ -792,7 +776,7 @@ int main(int argc, char *argv[])
                                trace_file = stdout;
                        } else {
                                trace_file = fopen(optarg, "a");
-                               if (debug_file == NULL)
+                               if (trace_file == NULL)
                                        FATAL("cannot open debug file '%s': %s\n",
                                                optarg, strerror(errno));
                        }
@@ -804,7 +788,6 @@ int main(int argc, char *argv[])
                        exit(EXIT_FAILURE);
                }
        }
-
        if (batch_interval > 0) {
                alr_batch = alr_batch_create(-1);
                if (alr_batch == NULL)