Whamcloud - gitweb
LU-17630 osc: add cond_resched() to osc_lru_shrink()
[fs/lustre-release.git] / lustre / utils / lljobstat
1 #!/usr/bin/python3
2
3 '''
4 lljobstat command. Read job_stats files, parse and aggregate data of every
5 job, show top jobs
6 '''
7
8 import argparse
9 import errno
10 import signal
11 import subprocess
12 import sys
13 import time
14 import yaml
15 try:
16     from yaml import CLoader as Loader
17 except ImportError:
18     from yaml import Loader
19
20 def exit_silently(signal, frame):
21     sys.exit(0)
22
23 class ArgParser: # pylint: disable=too-few-public-methods
24     '''
25     Class to define lljobstat command arguments
26     and parse the real command line arguments.
27     '''
28     def __init__(self):
29         self.args = None
30
31     def run(self):
32         '''
33         define and parse arguments
34         '''
35         parser = argparse.ArgumentParser(prog='lljobstat',
36                                          description='List top jobs.',
37                                          formatter_class=argparse.RawTextHelpFormatter)
38         parser.add_argument('-c', '--count', type=int, default=5,
39                             help='the number of top jobs to be listed (default 5).')
40         parser.add_argument('-i', '--interval', type=int, default=10,
41                             help='the interval in seconds to check job stats again (default 10).')
42         parser.add_argument('-n', '--repeats', type=int, default=-1,
43                             help='the times to repeat the parsing (default unlimited).')
44         parser.add_argument('--param', type=str, default='*.*.job_stats',
45                             help='the param path to be checked (default *.*.job_stats).')
46         parser.add_argument('-o', '--ost', dest='param', action='store_const',
47                             const='obdfilter.*.job_stats',
48                             help='check only OST job stats.')
49         parser.add_argument('-m', '--mdt', dest='param', action='store_const',
50                             const='mdt.*.job_stats',
51                             help='check only MDT job stats.')
52         parser.add_argument('--fullname', action='store_true', default=False,
53                             help='show full operation name (default False).')
54         parser.add_argument('--no-fullname', dest='fullname',
55                             action='store_false',
56                             help='show abbreviated operations name.')
57         parser.add_argument('--statsfile', type=str, action='append',
58                             help="Parse specified file instead of job_stats files in system.\n"
59                                  "Usually the file is generate by command:\n"
60                                  "# lctl get_param *.*.job_stats > job_stats.txt\n"
61                                  "Multiple files can be specified by specify --statsfile for\n"
62                                  "multiple times. For example:\n"
63                                  "# lljobstats --statsfile=1.txt --statsfile=2.txt\n"
64                                  "The stats data of jobs from multiple statsfiles will be\n"
65                                  "added up and sorted. Then top jobs are listed.\n")
66
67         self.args = parser.parse_args()
68
69
70 class JobStatsParser:
71     '''
72     Class to get/parse/aggregate/sort/print top jobs in job_stats
73     '''
74     op_keys = {
75         'ops': 'ops',
76         'cr' : 'create',
77         'op' : 'open',
78         'cl' : 'close',
79         'mn' : 'mknod',
80         'ln' : 'link',
81         'ul' : 'unlink',
82         'mk' : 'mkdir',
83         'rm' : 'rmdir',
84         'mv' : 'rename',
85         'ga' : 'getattr',
86         'sa' : 'setattr',
87         'gx' : 'getxattr',
88         'sx' : 'setxattr',
89         'st' : 'statfs',
90         'sy' : 'sync',
91         'rd' : 'read',
92         'wr' : 'write',
93         'pu' : 'punch',
94         'mi' : 'migrate',
95         'fa' : 'fallocate',
96         'dt' : 'destroy',
97         'gi' : 'get_info',
98         'si' : 'set_info',
99         'qc' : 'quotactl',
100         'pa' : 'prealloc'
101     }
102
103     def __init__(self):
104         self.args = None
105
106     def list_param(self, param_pattern): # pylint: disable=no-self-use
107         '''
108         list param paths with given param pattern
109         '''
110         cmd = ['lctl', 'list_param', param_pattern]
111         try:
112             output = subprocess.check_output(cmd).decode()
113             return output.splitlines()
114         except subprocess.CalledProcessError as err:
115             if err.returncode == errno.ENOENT:
116                 return []
117
118     def parse_single_job_stats(self, param=None, string=None): # pylint: disable=no-self-use
119         '''
120         read single job_stats file or a string, parse it and return an object
121         '''
122         output = ""
123         if string is not None:
124             output = string
125         else:
126             cmd = ['lctl', 'get_param', '-n', param]
127             output = subprocess.check_output(cmd).decode()
128
129         try:
130             yaml_obj = yaml.load(output, Loader=Loader)  # need several seconds...
131         except yaml.scanner.ScannerError:
132             # only print the file name here
133             if param != None:
134                 print("failed to parse the content of %s" % param, file=sys.stdout)
135             if string != None:
136                 print("failed to parse the string", file=sys.stdout)
137             raise
138
139         return yaml_obj
140
141     def merge_job(self, jobs, job):
142         '''
143         merge stats data of job to jobs
144         '''
145         job2 = jobs.get(job['job_id'], {})
146
147         for key in job.keys():
148             if key not in self.op_keys.values():
149                 continue
150             if job[key]['samples'] == 0:
151                 continue
152
153             job2[key] = job2.get(key, 0) + job[key]['samples']
154             job2['ops'] = job2.get('ops', 0) + job[key]['samples']
155
156         job2['job_id'] = job['job_id']
157         jobs[job['job_id']] = job2
158
159     def insert_job_sorted(self, top_jobs, count, job): # pylint: disable=no-self-use
160         '''
161         insert job to top_jobs in descending order by the key job['ops'].
162         top_jobs is an array with at most count elements
163         '''
164         top_jobs.append(job)
165
166         for i in range(len(top_jobs) - 2, -1, -1):
167             if job['ops'] > top_jobs[i]['ops']:
168                 top_jobs[i + 1] = top_jobs[i]
169                 top_jobs[i] = job
170             else:
171                 break
172
173         if len(top_jobs) > count:
174             top_jobs.pop()
175
176     def pick_top_jobs(self, jobs, count):
177         '''
178         choose at most count elements from jobs, put them in an array in
179         descending order by the key job['ops'].
180         '''
181         top_jobs = []
182         for _, job in jobs.items():
183             self.insert_job_sorted(top_jobs, count, job)
184
185         return top_jobs
186
187     def print_job(self, job):
188         '''
189         print single job
190         '''
191         print('- %-16s {' % (str(job['job_id']) + ':'), end='')
192         first = True
193         for key, val in self.op_keys.items():
194             if not val in job.keys():
195                 continue
196             if not first:
197                 print(", ", end='')
198
199             opname = key
200             if self.args.fullname:
201                 opname = self.op_keys[key]
202
203             print('%s: %d' % (opname, job[val]), end='')
204             if first:
205                 first = False
206         print('}')
207
208     def print_top_jobs(self, top_jobs):
209         '''
210         print top_jobs in YAML
211         '''
212         print('---') # mark the begining of YAML doc in stream
213         print("timestamp: %d" % int(time.time()))
214         print("top_jobs:")
215         for job in top_jobs:
216             self.print_job(job)
217         print('...') # mark the end of YAML doc in stream
218
219     def parse_file(self, file, jobs):
220         '''
221         parse a single file. the file may be a combination of several job_stats
222         '''
223         job_stats_str = ""
224         started = False
225         with open(file) as file_handler:
226             for line in file_handler:
227                 # start from "job_stats:"
228                 if not started:
229                     if line[0:10] == "job_stats:":
230                         started = True
231                         job_stats_str += line
232                     continue
233
234                 if line == "\n" or line[0:2] == "  " or line[0:2] == "- ":
235                     # for empty line, "  ", or "- ", append current line
236                     job_stats_str += line
237                 else:
238                     # end current segment
239                     obj = self.parse_single_job_stats(string=job_stats_str)
240                     if obj != None and obj['job_stats'] != None:
241                         for job in obj['job_stats']:
242                             self.merge_job(jobs, job)
243
244                     if line[0:10] == "job_stats:":
245                         # start of next segment
246                         job_stats_str = line
247                     else:
248                         job_stats_str = ""
249                         started = False
250             else:
251                 obj = self.parse_single_job_stats(string=job_stats_str)
252                 if obj != None and obj['job_stats'] != None:
253                     for job in obj['job_stats']:
254                         self.merge_job(jobs, job)
255
256     def run_once(self):
257         '''
258         scan/parse/aggregate/print top jobs in given job_stats pattern/path(s)
259         '''
260         jobs = {}
261
262         if self.args.statsfile:
263             for file in self.args.statsfile:
264                 self.parse_file(file, jobs)
265         else:
266             for param in self.list_param(self.args.param):
267                 obj = self.parse_single_job_stats(param=param)
268                 if obj is not None and obj['job_stats'] is not None:
269                     for job in obj['job_stats']:
270                         self.merge_job(jobs, job)
271
272         top_jobs = self.pick_top_jobs(jobs, self.args.count)
273         self.print_top_jobs(top_jobs)
274
275     def run_once_retry(self):
276         '''
277         Call run_once. If run_once succeeds, return.
278         If run_once throws an exception, retry for few times.
279         '''
280         for i in range(2, -1, -1):  # 2, 1, 0
281             try:
282                 return self.run_once()
283             except: # pylint: disable=bare-except
284                 if i == 0:
285                     raise
286
287     def run(self):
288         '''
289         run task periodically or for some times with given interval
290         '''
291         argparser = ArgParser()
292         argparser.run()
293         self.args = argparser.args
294
295         if self.args.statsfile:
296             self.run_once()
297             return
298
299         # exit silently if Ctrl+C is pressed in the loop below
300         signal.signal(signal.SIGINT, exit_silently)
301
302         i = 0
303         while True:
304             self.run_once_retry()
305             i += 1
306             if self.args.repeats != -1 and i >= self.args.repeats:
307                 break
308             time.sleep(self.args.interval)
309
310
311 if __name__ == "__main__":
312     JobStatsParser().run()