4 lljobstat command. Read job_stats files, parse and aggregate data of every
16 from yaml import CLoader as Loader
18 from yaml import Loader
20 def exit_silently(signal, frame):
23 class ArgParser: # pylint: disable=too-few-public-methods
25 Class to define lljobstat command arguments
26 and parse the real command line arguments.
33 define and parse arguments
35 parser = argparse.ArgumentParser(prog='lljobstat',
36 description='List top jobs.',
37 formatter_class=argparse.RawTextHelpFormatter)
38 parser.add_argument('-c', '--count', type=int, default=5,
39 help='the number of top jobs to be listed (default 5).')
40 parser.add_argument('-i', '--interval', type=int, default=10,
41 help='the interval in seconds to check job stats again (default 10).')
42 parser.add_argument('-n', '--repeats', type=int, default=-1,
43 help='the times to repeat the parsing (default unlimited).')
44 parser.add_argument('--param', type=str, default='*.*.job_stats',
45 help='the param path to be checked (default *.*.job_stats).')
46 parser.add_argument('-o', '--ost', dest='param', action='store_const',
47 const='obdfilter.*.job_stats',
48 help='check only OST job stats.')
49 parser.add_argument('-m', '--mdt', dest='param', action='store_const',
50 const='mdt.*.job_stats',
51 help='check only MDT job stats.')
52 parser.add_argument('--fullname', action='store_true', default=False,
53 help='show full operation name (default False).')
54 parser.add_argument('--no-fullname', dest='fullname',
56 help='show abbreviated operations name.')
57 parser.add_argument('--statsfile', type=str, action='append',
58 help="Parse specified file instead of job_stats files in system.\n"
59 "Usually the file is generate by command:\n"
60 "# lctl get_param *.*.job_stats > job_stats.txt\n"
61 "Multiple files can be specified by specify --statsfile for\n"
62 "multiple times. For example:\n"
63 "# lljobstats --statsfile=1.txt --statsfile=2.txt\n"
64 "The stats data of jobs from multiple statsfiles will be\n"
65 "added up and sorted. Then top jobs are listed.\n")
67 self.args = parser.parse_args()
72 Class to get/parse/aggregate/sort/print top jobs in job_stats
106 def list_param(self, param_pattern): # pylint: disable=no-self-use
108 list param paths with given param pattern
110 cmd = ['lctl', 'list_param', param_pattern]
112 output = subprocess.check_output(cmd).decode()
113 return output.splitlines()
114 except subprocess.CalledProcessError as err:
115 if err.returncode == errno.ENOENT:
118 def parse_single_job_stats(self, param=None, string=None): # pylint: disable=no-self-use
120 read single job_stats file or a string, parse it and return an object
123 if string is not None:
126 cmd = ['lctl', 'get_param', '-n', param]
127 output = subprocess.check_output(cmd).decode()
130 yaml_obj = yaml.load(output, Loader=Loader) # need several seconds...
131 except yaml.scanner.ScannerError:
132 # only print the file name here
134 print("failed to parse the content of %s" % param, file=sys.stdout)
136 print("failed to parse the string", file=sys.stdout)
141 def merge_job(self, jobs, job):
143 merge stats data of job to jobs
145 job2 = jobs.get(job['job_id'], {})
147 for key in job.keys():
148 if key not in self.op_keys.values():
150 if job[key]['samples'] == 0:
153 job2[key] = job2.get(key, 0) + job[key]['samples']
154 job2['ops'] = job2.get('ops', 0) + job[key]['samples']
156 job2['job_id'] = job['job_id']
157 jobs[job['job_id']] = job2
159 def insert_job_sorted(self, top_jobs, count, job): # pylint: disable=no-self-use
161 insert job to top_jobs in descending order by the key job['ops'].
162 top_jobs is an array with at most count elements
166 for i in range(len(top_jobs) - 2, -1, -1):
167 if job['ops'] > top_jobs[i]['ops']:
168 top_jobs[i + 1] = top_jobs[i]
173 if len(top_jobs) > count:
176 def pick_top_jobs(self, jobs, count):
178 choose at most count elements from jobs, put them in an array in
179 descending order by the key job['ops'].
182 for _, job in jobs.items():
183 self.insert_job_sorted(top_jobs, count, job)
187 def print_job(self, job):
191 print('- %-16s {' % (str(job['job_id']) + ':'), end='')
193 for key, val in self.op_keys.items():
194 if not val in job.keys():
200 if self.args.fullname:
201 opname = self.op_keys[key]
203 print('%s: %d' % (opname, job[val]), end='')
208 def print_top_jobs(self, top_jobs):
210 print top_jobs in YAML
212 print('---') # mark the begining of YAML doc in stream
213 print("timestamp: %d" % int(time.time()))
217 print('...') # mark the end of YAML doc in stream
219 def parse_file(self, file, jobs):
221 parse a single file. the file may be a combination of several job_stats
225 with open(file) as file_handler:
226 for line in file_handler:
227 # start from "job_stats:"
229 if line[0:10] == "job_stats:":
231 job_stats_str += line
234 if line == "\n" or line[0:2] == " " or line[0:2] == "- ":
235 # for empty line, " ", or "- ", append current line
236 job_stats_str += line
238 # end current segment
239 obj = self.parse_single_job_stats(string=job_stats_str)
240 if obj != None and obj['job_stats'] != None:
241 for job in obj['job_stats']:
242 self.merge_job(jobs, job)
244 if line[0:10] == "job_stats:":
245 # start of next segment
251 obj = self.parse_single_job_stats(string=job_stats_str)
252 if obj != None and obj['job_stats'] != None:
253 for job in obj['job_stats']:
254 self.merge_job(jobs, job)
258 scan/parse/aggregate/print top jobs in given job_stats pattern/path(s)
262 if self.args.statsfile:
263 for file in self.args.statsfile:
264 self.parse_file(file, jobs)
266 for param in self.list_param(self.args.param):
267 obj = self.parse_single_job_stats(param=param)
268 if obj is not None and obj['job_stats'] is not None:
269 for job in obj['job_stats']:
270 self.merge_job(jobs, job)
272 top_jobs = self.pick_top_jobs(jobs, self.args.count)
273 self.print_top_jobs(top_jobs)
275 def run_once_retry(self):
277 Call run_once. If run_once succeeds, return.
278 If run_once throws an exception, retry for few times.
280 for i in range(2, -1, -1): # 2, 1, 0
282 return self.run_once()
283 except: # pylint: disable=bare-except
289 run task periodically or for some times with given interval
291 argparser = ArgParser()
293 self.args = argparser.args
295 if self.args.statsfile:
299 # exit silently if Ctrl+C is pressed in the loop below
300 signal.signal(signal.SIGINT, exit_silently)
304 self.run_once_retry()
306 if self.args.repeats != -1 and i >= self.args.repeats:
308 time.sleep(self.args.interval)
311 if __name__ == "__main__":
312 JobStatsParser().run()