From ef2555d7af21bd35756805b13e6b458f56cecf54 Mon Sep 17 00:00:00 2001 From: Lei Feng Date: Sun, 10 Dec 2023 16:45:38 +0800 Subject: [PATCH] LU-17352 utils: lljobstat can read dumped stats files Improve lljobstat command to read dumped stats file. Usually the file is generated by command: lctl get_param *.*.job_stats > all_job_stats.txt Multiple files can be specified with multiple --statsfile options. For example: lljobstat --statsfile=1.txt --statsfile=2.txt Stats data from multiple files will be added up and sorted. Then the top jobs will be listed. Try to use CLoader to accelerate the YAML parsing. Handle SIGINT and exit silently if lljobstat is in the loop of reading system job_stats files periodically. Fix a bug when the job_id is a pure number. Signed-off-by: Lei Feng Test-Parameters: trivial Change-Id: Iee1ce69d2befb9d021e34effd4fc65a47297c1fb Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/53397 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Arshad Hussain Reviewed-by: Oleg Drokin --- lustre/tests/sanity.sh | 19 ++++++++++ lustre/utils/lljobstat | 100 ++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 105 insertions(+), 14 deletions(-) diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index c5c00df..6ad6e50 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -31213,6 +31213,24 @@ test_833() { } run_test 833 "Mixed buffered/direct read and write should not return -EIO" +test_850() { + local dir=$DIR/$tdir + local file=$dir/$tfile + local statsfile=$dir/all_job_stats.txt + + test_mkdir -p $dir || error "failed to create dir $dir" + echo "abcdefg" > $file || error "failed to create file $file" + + # read job_stats in the living system + lljobstat -n 1 || + error "failed to run lljobstat on living system" + + $LCTL get_param *.*.job_stats > $statsfile + lljobstat --statsfile=$statsfile || + error "failed to run lljobstat on file $statsfile" +} +run_test 850 "lljobstat can parse living and aggregated job_stats" + # # tests that do cleanup/setup should be run at the end # @@ -31441,6 +31459,7 @@ test_907() { } run_test 907 "write rpc error during unlink" + complete_test $SECONDS [ -f $EXT2_DEV ] && rm $EXT2_DEV || true check_and_cleanup_lustre diff --git a/lustre/utils/lljobstat b/lustre/utils/lljobstat index b235dc4..33529d5 100755 --- a/lustre/utils/lljobstat +++ b/lustre/utils/lljobstat @@ -7,11 +7,18 @@ job, show top jobs import argparse import errno +import signal import subprocess import sys import time import yaml +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader +def exit_silently(signal, frame): + sys.exit(0) class ArgParser: # pylint: disable=too-few-public-methods ''' @@ -26,7 +33,8 @@ class ArgParser: # pylint: disable=too-few-public-methods define and parse arguments ''' parser = argparse.ArgumentParser(prog='lljobstat', - description='List top jobs.') + description='List top jobs.', + formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('-c', '--count', type=int, default=5, help='the number of top jobs to be listed (default 5).') parser.add_argument('-i', '--interval', type=int, default=10, @@ -46,6 +54,15 @@ class ArgParser: # pylint: disable=too-few-public-methods parser.add_argument('--no-fullname', dest='fullname', action='store_false', help='show abbreviated operations name.') + parser.add_argument('--statsfile', type=str, action='append', + help="Parse specified file instead of job_stats files in system.\n" + "Usually the file is generate by command:\n" + "# lctl get_param *.*.job_stats > job_stats.txt\n" + "Multiple files can be specified by specify --statsfile for\n" + "multiple times. For example:\n" + "# lljobstats --statsfile=1.txt --statsfile=2.txt\n" + "The stats data of jobs from multiple statsfiles will be\n" + "added up and sorted. Then top jobs are listed.\n") self.args = parser.parse_args() @@ -98,17 +115,25 @@ class JobStatsParser: if err.returncode == errno.ENOENT: return [] - def parse_single_job_stats(self, param): # pylint: disable=no-self-use + def parse_single_job_stats(self, param=None, string=None): # pylint: disable=no-self-use ''' - read single job_stats file, parse it and return an object + read single job_stats file or a string, parse it and return an object ''' - cmd = ['lctl', 'get_param', '-n', param] - output = subprocess.check_output(cmd).decode() + output = "" + if string is not None: + output = string + else: + cmd = ['lctl', 'get_param', '-n', param] + output = subprocess.check_output(cmd).decode() + try: - yaml_obj = yaml.safe_load(output) # need several seconds... + yaml_obj = yaml.load(output, Loader=Loader) # need several seconds... except yaml.scanner.ScannerError: # only print the file name here - print("failed to parse the content of %s" % param, file=sys.stdout) + if param != None: + print("failed to parse the content of %s" % param, file=sys.stdout) + if string != None: + print("failed to parse the string", file=sys.stdout) raise return yaml_obj @@ -163,7 +188,7 @@ class JobStatsParser: ''' print single job ''' - print('- %-16s {' % (job['job_id'] + ':'), end='') + print('- %-16s {' % (str(job['job_id']) + ':'), end='') first = True for key, val in self.op_keys.items(): if not val in job.keys(): @@ -191,18 +216,58 @@ class JobStatsParser: self.print_job(job) print('...') # mark the end of YAML doc in stream + def parse_file(self, file, jobs): + ''' + parse a single file. the file may be a combination of several job_stats + ''' + job_stats_str = "" + started = False + with open(file) as file_handler: + for line in file_handler: + # start from "job_stats:" + if not started: + if line[0:10] == "job_stats:": + started = True + job_stats_str += line + continue + + if line == "\n" or line[0:2] == " " or line[0:2] == "- ": + # for empty line, " ", or "- ", append current line + job_stats_str += line + else: + # end current segment + obj = self.parse_single_job_stats(string=job_stats_str) + if obj != None and obj['job_stats'] != None: + for job in obj['job_stats']: + self.merge_job(jobs, job) + + if line[0:10] == "job_stats:": + # start of next segment + job_stats_str = line + else: + job_stats_str = "" + started = False + else: + obj = self.parse_single_job_stats(string=job_stats_str) + if obj != None and obj['job_stats'] != None: + for job in obj['job_stats']: + self.merge_job(jobs, job) + def run_once(self): ''' scan/parse/aggregate/print top jobs in given job_stats pattern/path(s) ''' jobs = {} - for param in self.list_param(self.args.param): - obj = self.parse_single_job_stats(param) - if obj['job_stats'] is None: - continue - for job in obj['job_stats']: - self.merge_job(jobs, job) + if self.args.statsfile: + for file in self.args.statsfile: + self.parse_file(file, jobs) + else: + for param in self.list_param(self.args.param): + obj = self.parse_single_job_stats(param=param) + if obj is not None and obj['job_stats'] is not None: + for job in obj['job_stats']: + self.merge_job(jobs, job) top_jobs = self.pick_top_jobs(jobs, self.args.count) self.print_top_jobs(top_jobs) @@ -227,6 +292,13 @@ class JobStatsParser: argparser.run() self.args = argparser.args + if self.args.statsfile: + self.run_once() + return + + # exit silently if Ctrl+C is pressed in the loop below + signal.signal(signal.SIGINT, exit_silently) + i = 0 while True: self.run_once_retry() -- 1.8.3.1