Whamcloud - gitweb
LU-16228 utils: add lljobstat util 88/48888/28
authorLei Feng <flei@whamcloud.com>
Mon, 17 Oct 2022 05:36:14 +0000 (13:36 +0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 27 Jan 2023 00:33:37 +0000 (00:33 +0000)
lljobstat util read datas from job_stats file(s),
parse, aggregate the data and list top jobs.

For example:
$ ./lljobstats -n 1 -c 3
---
timestamp: 1665984678
top_jobs:
- ll_sa_3508505.0: {ops: 64, ga: 64}
- touch.500:       {ops: 6, op: 1, cl: 1, mn: 1, ga: 1, sa: 2}
- bash.0:          {ops: 3, ga: 3}
...

Signed-off-by: Lei Feng <flei@whamcloud.com>
Test-Parameters: trivial
Change-Id: I0c4ac619496c184a5aebbaf8674f5090ab722d72
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48888
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
lustre.spec.in
lustre/doc/Makefile.am
lustre/doc/lljobstat.8 [new file with mode: 0644]
lustre/tests/sanity.sh
lustre/utils/Makefile.am
lustre/utils/lljobstat [new file with mode: 0755]

index 5384d26..2e34a11 100644 (file)
@@ -762,6 +762,7 @@ echo '%{_libdir}/lustre/tests/lutf/*' >>lustre-tests.files
 %endif
 
 %{_bindir}/llobdstat
+%{_bindir}/lljobstat
 %{_bindir}/llstat
 %{_bindir}/plot-llstat
 %{_datadir}/lustre
index 56dabd7..af50ace 100644 (file)
@@ -237,6 +237,7 @@ SERVER_MANFILES =                           \
        lctl-snapshot-umount.8                  \
        ll_decode_filter_fid.8                  \
        llobdstat.8                             \
+       lljobstat.8                             \
        llog_reader.8                           \
        llverdev.8                              \
        lshowmount.8                            \
diff --git a/lustre/doc/lljobstat.8 b/lustre/doc/lljobstat.8
new file mode 100644 (file)
index 0000000..63aa762
--- /dev/null
@@ -0,0 +1,80 @@
+.TH lljobstat 8 "Oct 14, 2022" Lustre "utilities"
+
+.SH NAME
+lljobstat \- display top jobs and statistics
+
+.SH SYNOPSIS
+.B "llobdstat"
+.RB [ -c|--count
+.IR COUNT ]
+.RB [ -i|--interval
+.IR INTERVAL ]
+.RB [ -n|--repeats
+.IR REPEATS ]
+.RB [ -m|--mdt ]
+.RB [ -o|--ost ]
+.RB [ --param
+.IR PARAM ]
+.RB [ --fullname ]
+.RB [ --no-fullname ]
+
+.SH DESCRIPTION
+.B lljobstat
+reads and parses the job_stats files on local node,
+sums up the operations of each job, and displays the top jobs.
+Repeat for some times or forever with given interval.
+.P
+Type Ctrl-C to stop printing.
+
+.SS Abbreviations
+\fBlljobstat\fR displays abbreviations of operation names as listed below:
+.P
+.nf
+ops: total number of operations
+cl: close,      cr: create,     dt: destroy,    fa: fallocate, ga: getattr
+gi: get_info,   gx: getxattr,   ln: link,       mi: migrate,   mk: mkdir
+mn: mknod,      mv: rename,     op: open,       pa: prealloc,  pu: punch
+qc: quotactl,   rd: read,       rm: rmdir,      sa: setattr,   si: set_info
+st: statfs,     sx: setxattr,   sy: sync,       ul: unlink,    wr: write
+.fi
+
+.SH "OPTIONS"
+.TP
+\fB-c|--count\fR \fICOUNT\fR
+how many top jobs are listed. Default 5.
+.TP
+\fB-i|--interval\fR \fIINTERVAL\fR
+interval in seconds before list top jobs again. Default 10.
+.TP
+\fB-n|--repeats\fR \fIREPEATS\fR
+how many times to repeat. Default unlimited.
+.TP
+\fB-m|--mdt\fR
+get data from only job_stats of MDTs.
+.TP
+\fB-o|--ost\fR
+get data from only job_stats of OSTs.
+.TP
+\fB--param\fR \fIPARAM\fR
+get data from only PARAM path. For example, "*.lustre-*.job_stat".
+.TP
+\fB--fullname\fR
+show full name of operations. Default no.
+.TP
+\fB--no-fullname\fR
+show abbreviated name of operations.
+.TP
+\fB-h|--help\fR
+print help message.
+
+.SH EXAMPLE
+.nf
+# lljobstat -n 1 -c 3
+---
+timestamp: 1665623345
+top_jobs:
+- rm.0:            {ops: 99, cl: 32, ul: 16, rm: 16, ga: 19, st: 16}
+- touch.500:       {ops: 48, op: 8, cl: 8, mn: 8, ga: 8, sa: 16}
+- dd.0:            {ops: 38, op: 4, cl: 4, mn: 1, ga: 1, sa: 3, gx: 3, wr: 19, pu: 3}
+\[char46]..
+.fi
index fb04f81..44c02a3 100755 (executable)
@@ -19209,10 +19209,10 @@ run_test 205c "Verify client stats format"
 test_205d() {
        local file=$DIR/$tdir/$tfile
 
-       (( $MDS1_VERSION >= $(version_code 2.15.52) )) ||
-               skip "need lustre >= 2.15.51"
-       (( $OST1_VERSION >= $(version_code 2.15.52) )) ||
-               skip "need lustre >= 2.15.51"
+       (( $MDS1_VERSION >= $(version_code 2.15.53) )) ||
+               skip "need lustre >= 2.15.53 for lljobstat"
+       (( $OST1_VERSION >= $(version_code 2.15.53) )) ||
+               skip "need lustre >= 2.15.53 for lljobstat"
        verify_yaml_available || skip_env "YAML verification not installed"
 
        test_mkdir -i 0 $DIR/$tdir
@@ -19222,6 +19222,7 @@ test_205d() {
                error "failed to write data to $file"
        mv $file $file.2
 
+       do_facet mds1 "$LCTL get_param -n mdt.$FSNAME-MDT0000.rename_stats"
        echo -n 'verify rename_stats...'
        do_facet mds1 "$LCTL get_param -n mdt.$FSNAME-MDT0000.rename_stats" |
                verify_yaml || error "rename_stats is not valid YAML"
@@ -19239,6 +19240,41 @@ test_205d() {
 }
 run_test 205d "verify the format of some stats files"
 
+test_205e() {
+       local ops_comma
+       local file=$DIR/$tdir/$tfile
+
+       (( $MDS1_VERSION >= $(version_code 2.15.53) )) ||
+               skip "need lustre >= 2.15.53 for lljobstat"
+       (( $OST1_VERSION >= $(version_code 2.15.53) )) ||
+               skip "need lustre >= 2.15.53 for lljobstat"
+       verify_yaml_available || skip_env "YAML verification not installed"
+
+       mkdir_on_mdt0 $DIR/$tdir || error "failed to create dir"
+
+       $LFS setstripe -E EOF -i 0 -c 1 $file ||
+               error "failed to create $file on ost1"
+       dd if=/dev/zero of=$file bs=1M count=10 oflag=sync ||
+               error "failed to write data to $file"
+
+       do_facet mds1 "$LCTL get_param *.*.job_stats"
+       do_facet ost1 "$LCTL get_param *.*.job_stats"
+
+       do_facet ost1 "lljobstat -n 1 -i 0 -c 1000"
+       do_facet ost1 "lljobstat -n 1 -i 0 -c 1000" | verify_yaml ||
+               error "The output of lljobstat is not an valid YAML"
+
+       # verify that job dd.0 does exist and has some ops on ost1
+       # typically this line is like:
+       # - dd.0:            {ops: 20, ...}
+       ops_comma=$(do_facet ost1 "lljobstat -n 1 -i 0 -c 1000" |
+                   awk '$2=="dd.0:" {print $4}')
+
+       (( ${ops_comma%,} >= 10 )) ||
+               error "cannot find job dd.0 with ops >= 10"
+}
+run_test 205e "verify the output of lljobstat"
+
 # LU-1480, LU-1773 and LU-1657
 test_206() {
        mkdir -p $DIR/$tdir
@@ -29106,6 +29142,7 @@ test_906() {
 }
 run_test 906 "Simple test for io_uring I/O engine via fio"
 
+
 complete $SECONDS
 [ -f $EXT2_DEV ] && rm $EXT2_DEV || true
 check_and_cleanup_lustre
index 9dfa304..18d0bb1 100644 (file)
@@ -31,7 +31,7 @@ rootsbin_PROGRAMS = mount.lustre
 if SERVER
 rootsbin_PROGRAMS += mount.lustre_tgt
 endif
-bin_SCRIPTS   = llstat llobdstat plot-llstat
+bin_SCRIPTS   = llstat llobdstat lljobstat plot-llstat
 bin_PROGRAMS  = lfs
 sbin_SCRIPTS  = ldlm_debug_upcall
 sbin_PROGRAMS = lctl l_getidentity llverdev llverfs lustre_rsync \
@@ -261,7 +261,7 @@ wiretest_SOURCES = wiretest.c
 
 endif # UTILS
 
-EXTRA_DIST = llstat llobdstat plot-llstat ldlm_debug_upcall liblustreapi.map
+EXTRA_DIST = llstat llobdstat lljobstat plot-llstat ldlm_debug_upcall liblustreapi.map
 
 # NOTE: this should only be run on i386.
 newwiretest: wirehdr.c wirecheck
diff --git a/lustre/utils/lljobstat b/lustre/utils/lljobstat
new file mode 100755 (executable)
index 0000000..b235dc4
--- /dev/null
@@ -0,0 +1,240 @@
+#!/usr/bin/python3
+
+'''
+lljobstat command. Read job_stats files, parse and aggregate data of every
+job, show top jobs
+'''
+
+import argparse
+import errno
+import subprocess
+import sys
+import time
+import yaml
+
+
+class ArgParser: # pylint: disable=too-few-public-methods
+    '''
+    Class to define lljobstat command arguments
+    and parse the real command line arguments.
+    '''
+    def __init__(self):
+        self.args = None
+
+    def run(self):
+        '''
+        define and parse arguments
+        '''
+        parser = argparse.ArgumentParser(prog='lljobstat',
+                                         description='List top jobs.')
+        parser.add_argument('-c', '--count', type=int, default=5,
+                            help='the number of top jobs to be listed (default 5).')
+        parser.add_argument('-i', '--interval', type=int, default=10,
+                            help='the interval in seconds to check job stats again (default 10).')
+        parser.add_argument('-n', '--repeats', type=int, default=-1,
+                            help='the times to repeat the parsing (default unlimited).')
+        parser.add_argument('--param', type=str, default='*.*.job_stats',
+                            help='the param path to be checked (default *.*.job_stats).')
+        parser.add_argument('-o', '--ost', dest='param', action='store_const',
+                            const='obdfilter.*.job_stats',
+                            help='check only OST job stats.')
+        parser.add_argument('-m', '--mdt', dest='param', action='store_const',
+                            const='mdt.*.job_stats',
+                            help='check only MDT job stats.')
+        parser.add_argument('--fullname', action='store_true', default=False,
+                            help='show full operation name (default False).')
+        parser.add_argument('--no-fullname', dest='fullname',
+                            action='store_false',
+                            help='show abbreviated operations name.')
+
+        self.args = parser.parse_args()
+
+
+class JobStatsParser:
+    '''
+    Class to get/parse/aggregate/sort/print top jobs in job_stats
+    '''
+    op_keys = {
+        'ops': 'ops',
+        'cr' : 'create',
+        'op' : 'open',
+        'cl' : 'close',
+        'mn' : 'mknod',
+        'ln' : 'link',
+        'ul' : 'unlink',
+        'mk' : 'mkdir',
+        'rm' : 'rmdir',
+        'mv' : 'rename',
+        'ga' : 'getattr',
+        'sa' : 'setattr',
+        'gx' : 'getxattr',
+        'sx' : 'setxattr',
+        'st' : 'statfs',
+        'sy' : 'sync',
+        'rd' : 'read',
+        'wr' : 'write',
+        'pu' : 'punch',
+        'mi' : 'migrate',
+        'fa' : 'fallocate',
+        'dt' : 'destroy',
+        'gi' : 'get_info',
+        'si' : 'set_info',
+        'qc' : 'quotactl',
+        'pa' : 'prealloc'
+    }
+
+    def __init__(self):
+        self.args = None
+
+    def list_param(self, param_pattern): # pylint: disable=no-self-use
+        '''
+        list param paths with given param pattern
+        '''
+        cmd = ['lctl', 'list_param', param_pattern]
+        try:
+            output = subprocess.check_output(cmd).decode()
+            return output.splitlines()
+        except subprocess.CalledProcessError as err:
+            if err.returncode == errno.ENOENT:
+                return []
+
+    def parse_single_job_stats(self, param): # pylint: disable=no-self-use
+        '''
+        read single job_stats file, parse it and return an object
+        '''
+        cmd = ['lctl', 'get_param', '-n', param]
+        output = subprocess.check_output(cmd).decode()
+        try:
+            yaml_obj = yaml.safe_load(output)  # need several seconds...
+        except yaml.scanner.ScannerError:
+            # only print the file name here
+            print("failed to parse the content of %s" % param, file=sys.stdout)
+            raise
+
+        return yaml_obj
+
+    def merge_job(self, jobs, job):
+        '''
+        merge stats data of job to jobs
+        '''
+        job2 = jobs.get(job['job_id'], {})
+
+        for key in job.keys():
+            if key not in self.op_keys.values():
+                continue
+            if job[key]['samples'] == 0:
+                continue
+
+            job2[key] = job2.get(key, 0) + job[key]['samples']
+            job2['ops'] = job2.get('ops', 0) + job[key]['samples']
+
+        job2['job_id'] = job['job_id']
+        jobs[job['job_id']] = job2
+
+    def insert_job_sorted(self, top_jobs, count, job): # pylint: disable=no-self-use
+        '''
+        insert job to top_jobs in descending order by the key job['ops'].
+        top_jobs is an array with at most count elements
+        '''
+        top_jobs.append(job)
+
+        for i in range(len(top_jobs) - 2, -1, -1):
+            if job['ops'] > top_jobs[i]['ops']:
+                top_jobs[i + 1] = top_jobs[i]
+                top_jobs[i] = job
+            else:
+                break
+
+        if len(top_jobs) > count:
+            top_jobs.pop()
+
+    def pick_top_jobs(self, jobs, count):
+        '''
+        choose at most count elements from jobs, put them in an array in
+        descending order by the key job['ops'].
+        '''
+        top_jobs = []
+        for _, job in jobs.items():
+            self.insert_job_sorted(top_jobs, count, job)
+
+        return top_jobs
+
+    def print_job(self, job):
+        '''
+        print single job
+        '''
+        print('- %-16s {' % (job['job_id'] + ':'), end='')
+        first = True
+        for key, val in self.op_keys.items():
+            if not val in job.keys():
+                continue
+            if not first:
+                print(", ", end='')
+
+            opname = key
+            if self.args.fullname:
+                opname = self.op_keys[key]
+
+            print('%s: %d' % (opname, job[val]), end='')
+            if first:
+                first = False
+        print('}')
+
+    def print_top_jobs(self, top_jobs):
+        '''
+        print top_jobs in YAML
+        '''
+        print('---') # mark the begining of YAML doc in stream
+        print("timestamp: %d" % int(time.time()))
+        print("top_jobs:")
+        for job in top_jobs:
+            self.print_job(job)
+        print('...') # mark the end of YAML doc in stream
+
+    def run_once(self):
+        '''
+        scan/parse/aggregate/print top jobs in given job_stats pattern/path(s)
+        '''
+        jobs = {}
+        for param in self.list_param(self.args.param):
+            obj = self.parse_single_job_stats(param)
+            if obj['job_stats'] is None:
+                continue
+
+            for job in obj['job_stats']:
+                self.merge_job(jobs, job)
+
+        top_jobs = self.pick_top_jobs(jobs, self.args.count)
+        self.print_top_jobs(top_jobs)
+
+    def run_once_retry(self):
+        '''
+        Call run_once. If run_once succeeds, return.
+        If run_once throws an exception, retry for few times.
+        '''
+        for i in range(2, -1, -1):  # 2, 1, 0
+            try:
+                return self.run_once()
+            except: # pylint: disable=bare-except
+                if i == 0:
+                    raise
+
+    def run(self):
+        '''
+        run task periodically or for some times with given interval
+        '''
+        argparser = ArgParser()
+        argparser.run()
+        self.args = argparser.args
+
+        i = 0
+        while True:
+            self.run_once_retry()
+            i += 1
+            if self.args.repeats != -1 and i >= self.args.repeats:
+                break
+            time.sleep(self.args.interval)
+
+
+if __name__ == "__main__":
+    JobStatsParser().run()