Whamcloud - gitweb
LU-16228 utils: add lljobstat util
[fs/lustre-release.git] / lustre / utils / lljobstat
1 #!/usr/bin/python3
2
3 '''
4 lljobstat command. Read job_stats files, parse and aggregate data of every
5 job, show top jobs
6 '''
7
8 import argparse
9 import errno
10 import subprocess
11 import sys
12 import time
13 import yaml
14
15
16 class ArgParser: # pylint: disable=too-few-public-methods
17     '''
18     Class to define lljobstat command arguments
19     and parse the real command line arguments.
20     '''
21     def __init__(self):
22         self.args = None
23
24     def run(self):
25         '''
26         define and parse arguments
27         '''
28         parser = argparse.ArgumentParser(prog='lljobstat',
29                                          description='List top jobs.')
30         parser.add_argument('-c', '--count', type=int, default=5,
31                             help='the number of top jobs to be listed (default 5).')
32         parser.add_argument('-i', '--interval', type=int, default=10,
33                             help='the interval in seconds to check job stats again (default 10).')
34         parser.add_argument('-n', '--repeats', type=int, default=-1,
35                             help='the times to repeat the parsing (default unlimited).')
36         parser.add_argument('--param', type=str, default='*.*.job_stats',
37                             help='the param path to be checked (default *.*.job_stats).')
38         parser.add_argument('-o', '--ost', dest='param', action='store_const',
39                             const='obdfilter.*.job_stats',
40                             help='check only OST job stats.')
41         parser.add_argument('-m', '--mdt', dest='param', action='store_const',
42                             const='mdt.*.job_stats',
43                             help='check only MDT job stats.')
44         parser.add_argument('--fullname', action='store_true', default=False,
45                             help='show full operation name (default False).')
46         parser.add_argument('--no-fullname', dest='fullname',
47                             action='store_false',
48                             help='show abbreviated operations name.')
49
50         self.args = parser.parse_args()
51
52
53 class JobStatsParser:
54     '''
55     Class to get/parse/aggregate/sort/print top jobs in job_stats
56     '''
57     op_keys = {
58         'ops': 'ops',
59         'cr' : 'create',
60         'op' : 'open',
61         'cl' : 'close',
62         'mn' : 'mknod',
63         'ln' : 'link',
64         'ul' : 'unlink',
65         'mk' : 'mkdir',
66         'rm' : 'rmdir',
67         'mv' : 'rename',
68         'ga' : 'getattr',
69         'sa' : 'setattr',
70         'gx' : 'getxattr',
71         'sx' : 'setxattr',
72         'st' : 'statfs',
73         'sy' : 'sync',
74         'rd' : 'read',
75         'wr' : 'write',
76         'pu' : 'punch',
77         'mi' : 'migrate',
78         'fa' : 'fallocate',
79         'dt' : 'destroy',
80         'gi' : 'get_info',
81         'si' : 'set_info',
82         'qc' : 'quotactl',
83         'pa' : 'prealloc'
84     }
85
86     def __init__(self):
87         self.args = None
88
89     def list_param(self, param_pattern): # pylint: disable=no-self-use
90         '''
91         list param paths with given param pattern
92         '''
93         cmd = ['lctl', 'list_param', param_pattern]
94         try:
95             output = subprocess.check_output(cmd).decode()
96             return output.splitlines()
97         except subprocess.CalledProcessError as err:
98             if err.returncode == errno.ENOENT:
99                 return []
100
101     def parse_single_job_stats(self, param): # pylint: disable=no-self-use
102         '''
103         read single job_stats file, parse it and return an object
104         '''
105         cmd = ['lctl', 'get_param', '-n', param]
106         output = subprocess.check_output(cmd).decode()
107         try:
108             yaml_obj = yaml.safe_load(output)  # need several seconds...
109         except yaml.scanner.ScannerError:
110             # only print the file name here
111             print("failed to parse the content of %s" % param, file=sys.stdout)
112             raise
113
114         return yaml_obj
115
116     def merge_job(self, jobs, job):
117         '''
118         merge stats data of job to jobs
119         '''
120         job2 = jobs.get(job['job_id'], {})
121
122         for key in job.keys():
123             if key not in self.op_keys.values():
124                 continue
125             if job[key]['samples'] == 0:
126                 continue
127
128             job2[key] = job2.get(key, 0) + job[key]['samples']
129             job2['ops'] = job2.get('ops', 0) + job[key]['samples']
130
131         job2['job_id'] = job['job_id']
132         jobs[job['job_id']] = job2
133
134     def insert_job_sorted(self, top_jobs, count, job): # pylint: disable=no-self-use
135         '''
136         insert job to top_jobs in descending order by the key job['ops'].
137         top_jobs is an array with at most count elements
138         '''
139         top_jobs.append(job)
140
141         for i in range(len(top_jobs) - 2, -1, -1):
142             if job['ops'] > top_jobs[i]['ops']:
143                 top_jobs[i + 1] = top_jobs[i]
144                 top_jobs[i] = job
145             else:
146                 break
147
148         if len(top_jobs) > count:
149             top_jobs.pop()
150
151     def pick_top_jobs(self, jobs, count):
152         '''
153         choose at most count elements from jobs, put them in an array in
154         descending order by the key job['ops'].
155         '''
156         top_jobs = []
157         for _, job in jobs.items():
158             self.insert_job_sorted(top_jobs, count, job)
159
160         return top_jobs
161
162     def print_job(self, job):
163         '''
164         print single job
165         '''
166         print('- %-16s {' % (job['job_id'] + ':'), end='')
167         first = True
168         for key, val in self.op_keys.items():
169             if not val in job.keys():
170                 continue
171             if not first:
172                 print(", ", end='')
173
174             opname = key
175             if self.args.fullname:
176                 opname = self.op_keys[key]
177
178             print('%s: %d' % (opname, job[val]), end='')
179             if first:
180                 first = False
181         print('}')
182
183     def print_top_jobs(self, top_jobs):
184         '''
185         print top_jobs in YAML
186         '''
187         print('---') # mark the begining of YAML doc in stream
188         print("timestamp: %d" % int(time.time()))
189         print("top_jobs:")
190         for job in top_jobs:
191             self.print_job(job)
192         print('...') # mark the end of YAML doc in stream
193
194     def run_once(self):
195         '''
196         scan/parse/aggregate/print top jobs in given job_stats pattern/path(s)
197         '''
198         jobs = {}
199         for param in self.list_param(self.args.param):
200             obj = self.parse_single_job_stats(param)
201             if obj['job_stats'] is None:
202                 continue
203
204             for job in obj['job_stats']:
205                 self.merge_job(jobs, job)
206
207         top_jobs = self.pick_top_jobs(jobs, self.args.count)
208         self.print_top_jobs(top_jobs)
209
210     def run_once_retry(self):
211         '''
212         Call run_once. If run_once succeeds, return.
213         If run_once throws an exception, retry for few times.
214         '''
215         for i in range(2, -1, -1):  # 2, 1, 0
216             try:
217                 return self.run_once()
218             except: # pylint: disable=bare-except
219                 if i == 0:
220                     raise
221
222     def run(self):
223         '''
224         run task periodically or for some times with given interval
225         '''
226         argparser = ArgParser()
227         argparser.run()
228         self.args = argparser.args
229
230         i = 0
231         while True:
232             self.run_once_retry()
233             i += 1
234             if self.args.repeats != -1 and i >= self.args.repeats:
235                 break
236             time.sleep(self.args.interval)
237
238
239 if __name__ == "__main__":
240     JobStatsParser().run()