--- /dev/null
+#!/usr/bin/python3
+
+'''
+lljobstat command. Read job_stats files, parse and aggregate data of every
+job, show top jobs
+'''
+
+import argparse
+import errno
+import subprocess
+import sys
+import time
+import yaml
+
+
+class ArgParser: # pylint: disable=too-few-public-methods
+ '''
+ Class to define lljobstat command arguments
+ and parse the real command line arguments.
+ '''
+ def __init__(self):
+ self.args = None
+
+ def run(self):
+ '''
+ define and parse arguments
+ '''
+ parser = argparse.ArgumentParser(prog='lljobstat',
+ description='List top jobs.')
+ parser.add_argument('-c', '--count', type=int, default=5,
+ help='the number of top jobs to be listed (default 5).')
+ parser.add_argument('-i', '--interval', type=int, default=10,
+ help='the interval in seconds to check job stats again (default 10).')
+ parser.add_argument('-n', '--repeats', type=int, default=-1,
+ help='the times to repeat the parsing (default unlimited).')
+ parser.add_argument('--param', type=str, default='*.*.job_stats',
+ help='the param path to be checked (default *.*.job_stats).')
+ parser.add_argument('-o', '--ost', dest='param', action='store_const',
+ const='obdfilter.*.job_stats',
+ help='check only OST job stats.')
+ parser.add_argument('-m', '--mdt', dest='param', action='store_const',
+ const='mdt.*.job_stats',
+ help='check only MDT job stats.')
+ parser.add_argument('--fullname', action='store_true', default=False,
+ help='show full operation name (default False).')
+ parser.add_argument('--no-fullname', dest='fullname',
+ action='store_false',
+ help='show abbreviated operations name.')
+
+ self.args = parser.parse_args()
+
+
+class JobStatsParser:
+ '''
+ Class to get/parse/aggregate/sort/print top jobs in job_stats
+ '''
+ op_keys = {
+ 'ops': 'ops',
+ 'cr' : 'create',
+ 'op' : 'open',
+ 'cl' : 'close',
+ 'mn' : 'mknod',
+ 'ln' : 'link',
+ 'ul' : 'unlink',
+ 'mk' : 'mkdir',
+ 'rm' : 'rmdir',
+ 'mv' : 'rename',
+ 'ga' : 'getattr',
+ 'sa' : 'setattr',
+ 'gx' : 'getxattr',
+ 'sx' : 'setxattr',
+ 'st' : 'statfs',
+ 'sy' : 'sync',
+ 'rd' : 'read',
+ 'wr' : 'write',
+ 'pu' : 'punch',
+ 'mi' : 'migrate',
+ 'fa' : 'fallocate',
+ 'dt' : 'destroy',
+ 'gi' : 'get_info',
+ 'si' : 'set_info',
+ 'qc' : 'quotactl',
+ 'pa' : 'prealloc'
+ }
+
+ def __init__(self):
+ self.args = None
+
+ def list_param(self, param_pattern): # pylint: disable=no-self-use
+ '''
+ list param paths with given param pattern
+ '''
+ cmd = ['lctl', 'list_param', param_pattern]
+ try:
+ output = subprocess.check_output(cmd).decode()
+ return output.splitlines()
+ except subprocess.CalledProcessError as err:
+ if err.returncode == errno.ENOENT:
+ return []
+
+ def parse_single_job_stats(self, param): # pylint: disable=no-self-use
+ '''
+ read single job_stats file, parse it and return an object
+ '''
+ cmd = ['lctl', 'get_param', '-n', param]
+ output = subprocess.check_output(cmd).decode()
+ try:
+ yaml_obj = yaml.safe_load(output) # need several seconds...
+ except yaml.scanner.ScannerError:
+ # only print the file name here
+ print("failed to parse the content of %s" % param, file=sys.stdout)
+ raise
+
+ return yaml_obj
+
+ def merge_job(self, jobs, job):
+ '''
+ merge stats data of job to jobs
+ '''
+ job2 = jobs.get(job['job_id'], {})
+
+ for key in job.keys():
+ if key not in self.op_keys.values():
+ continue
+ if job[key]['samples'] == 0:
+ continue
+
+ job2[key] = job2.get(key, 0) + job[key]['samples']
+ job2['ops'] = job2.get('ops', 0) + job[key]['samples']
+
+ job2['job_id'] = job['job_id']
+ jobs[job['job_id']] = job2
+
+ def insert_job_sorted(self, top_jobs, count, job): # pylint: disable=no-self-use
+ '''
+ insert job to top_jobs in descending order by the key job['ops'].
+ top_jobs is an array with at most count elements
+ '''
+ top_jobs.append(job)
+
+ for i in range(len(top_jobs) - 2, -1, -1):
+ if job['ops'] > top_jobs[i]['ops']:
+ top_jobs[i + 1] = top_jobs[i]
+ top_jobs[i] = job
+ else:
+ break
+
+ if len(top_jobs) > count:
+ top_jobs.pop()
+
+ def pick_top_jobs(self, jobs, count):
+ '''
+ choose at most count elements from jobs, put them in an array in
+ descending order by the key job['ops'].
+ '''
+ top_jobs = []
+ for _, job in jobs.items():
+ self.insert_job_sorted(top_jobs, count, job)
+
+ return top_jobs
+
+ def print_job(self, job):
+ '''
+ print single job
+ '''
+ print('- %-16s {' % (job['job_id'] + ':'), end='')
+ first = True
+ for key, val in self.op_keys.items():
+ if not val in job.keys():
+ continue
+ if not first:
+ print(", ", end='')
+
+ opname = key
+ if self.args.fullname:
+ opname = self.op_keys[key]
+
+ print('%s: %d' % (opname, job[val]), end='')
+ if first:
+ first = False
+ print('}')
+
+ def print_top_jobs(self, top_jobs):
+ '''
+ print top_jobs in YAML
+ '''
+ print('---') # mark the begining of YAML doc in stream
+ print("timestamp: %d" % int(time.time()))
+ print("top_jobs:")
+ for job in top_jobs:
+ self.print_job(job)
+ print('...') # mark the end of YAML doc in stream
+
+ def run_once(self):
+ '''
+ scan/parse/aggregate/print top jobs in given job_stats pattern/path(s)
+ '''
+ jobs = {}
+ for param in self.list_param(self.args.param):
+ obj = self.parse_single_job_stats(param)
+ if obj['job_stats'] is None:
+ continue
+
+ for job in obj['job_stats']:
+ self.merge_job(jobs, job)
+
+ top_jobs = self.pick_top_jobs(jobs, self.args.count)
+ self.print_top_jobs(top_jobs)
+
+ def run_once_retry(self):
+ '''
+ Call run_once. If run_once succeeds, return.
+ If run_once throws an exception, retry for few times.
+ '''
+ for i in range(2, -1, -1): # 2, 1, 0
+ try:
+ return self.run_once()
+ except: # pylint: disable=bare-except
+ if i == 0:
+ raise
+
+ def run(self):
+ '''
+ run task periodically or for some times with given interval
+ '''
+ argparser = ArgParser()
+ argparser.run()
+ self.args = argparser.args
+
+ i = 0
+ while True:
+ self.run_once_retry()
+ i += 1
+ if self.args.repeats != -1 and i >= self.args.repeats:
+ break
+ time.sleep(self.args.interval)
+
+
+if __name__ == "__main__":
+ JobStatsParser().run()