4 lljobstat command. Read job_stats files, parse and aggregate data of every
16 class ArgParser: # pylint: disable=too-few-public-methods
18 Class to define lljobstat command arguments
19 and parse the real command line arguments.
26 define and parse arguments
28 parser = argparse.ArgumentParser(prog='lljobstat',
29 description='List top jobs.')
30 parser.add_argument('-c', '--count', type=int, default=5,
31 help='the number of top jobs to be listed (default 5).')
32 parser.add_argument('-i', '--interval', type=int, default=10,
33 help='the interval in seconds to check job stats again (default 10).')
34 parser.add_argument('-n', '--repeats', type=int, default=-1,
35 help='the times to repeat the parsing (default unlimited).')
36 parser.add_argument('--param', type=str, default='*.*.job_stats',
37 help='the param path to be checked (default *.*.job_stats).')
38 parser.add_argument('-o', '--ost', dest='param', action='store_const',
39 const='obdfilter.*.job_stats',
40 help='check only OST job stats.')
41 parser.add_argument('-m', '--mdt', dest='param', action='store_const',
42 const='mdt.*.job_stats',
43 help='check only MDT job stats.')
44 parser.add_argument('--fullname', action='store_true', default=False,
45 help='show full operation name (default False).')
46 parser.add_argument('--no-fullname', dest='fullname',
48 help='show abbreviated operations name.')
50 self.args = parser.parse_args()
55 Class to get/parse/aggregate/sort/print top jobs in job_stats
89 def list_param(self, param_pattern): # pylint: disable=no-self-use
91 list param paths with given param pattern
93 cmd = ['lctl', 'list_param', param_pattern]
95 output = subprocess.check_output(cmd).decode()
96 return output.splitlines()
97 except subprocess.CalledProcessError as err:
98 if err.returncode == errno.ENOENT:
101 def parse_single_job_stats(self, param): # pylint: disable=no-self-use
103 read single job_stats file, parse it and return an object
105 cmd = ['lctl', 'get_param', '-n', param]
106 output = subprocess.check_output(cmd).decode()
108 yaml_obj = yaml.safe_load(output) # need several seconds...
109 except yaml.scanner.ScannerError:
110 # only print the file name here
111 print("failed to parse the content of %s" % param, file=sys.stdout)
116 def merge_job(self, jobs, job):
118 merge stats data of job to jobs
120 job2 = jobs.get(job['job_id'], {})
122 for key in job.keys():
123 if key not in self.op_keys.values():
125 if job[key]['samples'] == 0:
128 job2[key] = job2.get(key, 0) + job[key]['samples']
129 job2['ops'] = job2.get('ops', 0) + job[key]['samples']
131 job2['job_id'] = job['job_id']
132 jobs[job['job_id']] = job2
134 def insert_job_sorted(self, top_jobs, count, job): # pylint: disable=no-self-use
136 insert job to top_jobs in descending order by the key job['ops'].
137 top_jobs is an array with at most count elements
141 for i in range(len(top_jobs) - 2, -1, -1):
142 if job['ops'] > top_jobs[i]['ops']:
143 top_jobs[i + 1] = top_jobs[i]
148 if len(top_jobs) > count:
151 def pick_top_jobs(self, jobs, count):
153 choose at most count elements from jobs, put them in an array in
154 descending order by the key job['ops'].
157 for _, job in jobs.items():
158 self.insert_job_sorted(top_jobs, count, job)
162 def print_job(self, job):
166 print('- %-16s {' % (job['job_id'] + ':'), end='')
168 for key, val in self.op_keys.items():
169 if not val in job.keys():
175 if self.args.fullname:
176 opname = self.op_keys[key]
178 print('%s: %d' % (opname, job[val]), end='')
183 def print_top_jobs(self, top_jobs):
185 print top_jobs in YAML
187 print('---') # mark the begining of YAML doc in stream
188 print("timestamp: %d" % int(time.time()))
192 print('...') # mark the end of YAML doc in stream
196 scan/parse/aggregate/print top jobs in given job_stats pattern/path(s)
199 for param in self.list_param(self.args.param):
200 obj = self.parse_single_job_stats(param)
201 if obj['job_stats'] is None:
204 for job in obj['job_stats']:
205 self.merge_job(jobs, job)
207 top_jobs = self.pick_top_jobs(jobs, self.args.count)
208 self.print_top_jobs(top_jobs)
210 def run_once_retry(self):
212 Call run_once. If run_once succeeds, return.
213 If run_once throws an exception, retry for few times.
215 for i in range(2, -1, -1): # 2, 1, 0
217 return self.run_once()
218 except: # pylint: disable=bare-except
224 run task periodically or for some times with given interval
226 argparser = ArgParser()
228 self.args = argparser.args
232 self.run_once_retry()
234 if self.args.repeats != -1 and i >= self.args.repeats:
236 time.sleep(self.args.interval)
239 if __name__ == "__main__":
240 JobStatsParser().run()