1 from pathlib import Path
2 from clutf_agent import *
3 from lutf_common_def import *
4 import lutf_common_def as common
5 from lutf_exception import LUTFError, LutfDumper
6 from lutf_cmd import lutf_exec_local_cmd
7 import importlib, socket
11 import os, subprocess, sys, yaml, fnmatch, logging, csv
12 import shutil, traceback, datetime, re, copy
18 def __init__(self, y=None):
19 if y is not None and (type(y) is not dict and type(y) is not list):
20 raise LUTFError('This class takes dictionaries or lists only')
27 return yaml.dump(self.__yaml)
29 def load(self, stream):
31 raise LUTFError('There exists a YAML instance')
32 self.__yaml = yaml.load(stream, Loader=yaml.FullLoader)
43 def __setitem__(self, key, value):
44 for i, e in enumerate(self.__results):
45 if e.get()['name'] == key:
47 self.__results[i] = LutfYaml(value)
50 self.__results.append(LutfYaml(value))
51 self.__max = len(self.__results)
53 def __getitem__(self, key):
54 for entry in self.__results:
55 if entry.get()['name'] == key:
63 # needed for python 3.x
65 if self.__n < self.__max:
66 rc = self.__results[self.__n]
68 return rc['name'], rc.get()
72 def get(self, status=None):
74 for entry in self.__results:
76 if status and type(status) == str:
77 if e['status'] != status.upper():
79 shadow.append(entry.get())
82 # subtest_result = YamlResults
83 # global_test_resutls['lutf-dlc']['script-name'] = rc
84 class YamlGlobalTestResults:
85 def __init__(self, desc=None):
87 self.desc = 'auster lutf'
88 self.__results = {'Tests': []}
92 def __setitem__(self, key, value):
93 if type(value) != dict:
94 raise TypeError("This class only takes dict type")
95 for i, e in enumerate(self.__results['Tests']):
97 self.__results['Tests'][i]['SubTests'][value['name']] = value
100 lutf = {'name': key, 'description': self.desc, 'SubTests': YamlResults()}
101 lutf['SubTests'][value['name']] = value
102 self.__results['Tests'].append(lutf)
103 self.__max = len(self.__results['Tests'])
106 def __getitem__(self, key):
107 for entry in self.__results['Tests']:
108 if entry['name'] == key:
109 return entry['SubTests']
116 # needed for python 3.x
118 if self.__n < self.__max:
119 rc = self.__results['Tests'][self.__n]
121 return rc['name'], rc
125 def finalize(self, name):
126 timefmt = datetime.datetime.utcnow().strftime('%a %b %d %H:%M:%S UTC %Y')
127 for e in self.__results['Tests']:
128 if e['name'] == name:
131 subs = e['SubTests'].get()
133 total_duration += r['duration']
134 if r['status'] == 'FAIL':
136 e['duration'] = total_duration
137 # TODO: Pass the LUTF for now until we clean up the tests
139 e['status'] = sstatus
140 e['submission'] = timefmt
143 rc = copy.deepcopy(self.__results)
144 for t in rc['Tests']:
145 t['SubTests'] = t['SubTests'].get()
149 def __init__(self, base_name):
150 doc_path = os.path.join(clutf_global.get_lutf_path(), 'documentation')
151 Path(doc_path).mkdir(parents=True, exist_ok=True)
152 self.__req = os.path.join(clutf_global.get_lutf_path(), 'documentation',
153 os.path.splitext(base_name)[0]+'_req.csv')
154 self.__hld = os.path.join(clutf_global.get_lutf_path(), 'documentation',
155 os.path.splitext(base_name)[0]+'_hld.csv')
156 self.__tp = os.path.join(clutf_global.get_lutf_path(), 'documentation',
157 os.path.splitext(base_name)[0]+'_tp.csv')
158 self.__req_writeheader()
159 self.__hld_writeheader()
160 self.__tp_writeheader()
162 def __req_writeheader(self):
163 if not os.path.isfile(self.__req):
164 header = ["Test Case ID", "Requirement Id", "Requirement Description"]
165 with open(self.__req, 'w') as fcsv:
166 writer = csv.writer(fcsv)
167 writer.writerow(header)
169 def __hld_writeheader(self):
170 if not os.path.isfile(self.__req):
171 header = ["Test Case ID", "Requirement Id", "Design Notes"]
172 with open(self.__hld, 'w') as fcsv:
173 writer = csv.writer(fcsv)
174 writer.writerow(header)
176 def __tp_writeheader(self):
177 if not os.path.isfile(self.__req):
178 header = ["Test Case ID", "Primary Requirement Id", "Secondary Requirement Id", "Test Case"]
179 with open(self.__tp, 'w') as fcsv:
180 writer = csv.writer(fcsv)
181 writer.writerow(header)
183 def req_writerow(self, req_id, req_desc, fname):
184 with open(self.__req, 'a+') as fcsv:
185 writer = csv.writer(fcsv)
186 writer.writerow([fname, req_id, req_desc])
188 def hld_writerow(self, req_id, design, fname):
189 with open(self.__hld, 'a+') as fcsv:
190 writer = csv.writer(fcsv)
191 writer.writerow([fname, req_id, design])
193 def tp_writerow(self, preq_id, sreq_id, tc, fname):
194 with open(self.__tp, 'a+') as fcsv:
195 writer = csv.writer(fcsv)
196 writer.writerow([fname, preq_id, sreq_id, tc])
199 def __init__(self, abs_path, callbacks, parent_suite, collection):
200 self.name = os.path.splitext(os.path.split(abs_path)[1])[0]
201 self.__abs_path = abs_path
202 self.__callbacks = callbacks
203 self.__parent_suite = parent_suite.replace('suite_', '')
204 self.__collection = collection
206 def is_expected_failure(self, name):
207 return self.__collection.in_expected_failures_list(name)
209 def create_docs(self, csvfile):
210 # open script and extract comment block. It is expected to
211 # be at the beginning of the file
214 with open(self.__abs_path, 'r') as f:
215 lines = f.readlines()
217 if len(l.strip()) > 0 and l.strip() == '"""':
224 doc.append(l.strip())
228 meta = {'prim': {'txt': [], 'st': False},
229 'primd': {'txt': [], 'st': False},
230 'sec': {'txt': [], 'st': False},
231 'des': {'txt': [], 'st': False},
232 'tc': {'txt': [], 'st': False}}
236 meta['prim']['st'] = True
237 meta['primd']['st'] = False
238 meta['sec']['st'] = False
239 meta['des']['st'] = False
240 meta['tc']['st'] = False
241 meta['prim']['txt'].append(l.split('@PRIMARY:')[1].strip())
242 elif '@PRIMARY_DESC:' in l:
243 meta['prim']['st'] = False
244 meta['primd']['st'] = True
245 meta['sec']['st'] = False
246 meta['des']['st'] = False
247 meta['tc']['st'] = False
248 meta['primd']['txt'].append(l.split('@PRIMARY_DESC:')[1].strip())
249 elif '@SECONDARY:' in l:
250 meta['prim']['st'] = False
251 meta['primd']['st'] = False
252 meta['sec']['st'] = True
253 meta['des']['st'] = False
254 meta['tc']['st'] = False
255 meta['sec']['txt'].append(l.split('@SECONDARY:')[1].strip())
256 elif '@DESIGN:' in l:
257 meta['prim']['st'] = False
258 meta['primd']['st'] = False
259 meta['sec']['st'] = False
260 meta['des']['st'] = True
261 meta['tc']['st'] = False
262 meta['des']['txt'].append(l.split('@DESIGN:')[1].strip())
263 elif '@TESTCASE:' in l:
264 meta['prim']['st'] = False
265 meta['primd']['st'] = False
266 meta['sec']['st'] = False
267 meta['des']['st'] = False
268 meta['tc']['st'] = True
269 meta['tc']['txt'].append(l.split('@TESTCASE:')[1].strip())
270 elif meta['prim']['st']:
271 meta['prim']['txt'].append('\n'+l)
272 elif meta['primd']['st']:
273 meta['primd']['txt'].append('\n'+l)
274 elif meta['sec']['st']:
275 meta['sec']['txt'].append('\n'+l)
276 elif meta['des']['st']:
277 meta['des']['txt'].append('\n'+l)
278 elif meta['tc']['st']:
279 meta['tc']['txt'].append('\n'+l)
281 documentation = Documentation(csvfile)
282 documentation.req_writerow("".join(meta['prim']['txt']),
283 "".join(meta['primd']['txt']),
285 documentation.hld_writerow("".join(meta['prim']['txt']),
286 "".join(meta['des']['txt']),
288 documentation.tp_writerow("".join(meta['prim']['txt']),
289 "".join(meta['sec']['txt']),
290 "".join(meta['tc']['txt']),
293 def run(self, progress=-1):
294 global global_test_results
297 name = self.name.replace('test_', '')
299 preferences = common.global_pref
301 module = __import__(self.name)
302 # force a reload in case it has changed since it has
303 # been previously be imported
304 importlib.reload(module)
306 module_run = getattr(module, 'run')
307 except Exception as e:
311 if hasattr(module_run, '__call__'):
313 if type(self.__callbacks) is TestSuiteCallbacks and \
314 'clean' in self.__callbacks:
316 logging.critical("CLEANING UP BEFORE -->" + self.name)
317 self.__callbacks['clean']()
318 except Exception as e:
319 logging.critical("EXCEPTION CLEANING BEFORE -->" + self.name)
320 if preferences['halt_on_exception']:
323 # if the script went out of its way to say I want to halt all execution
325 if type(e) == LUTFError and e.halt:
328 rc = {'status': 'FAIL', 'error': str(e)}
331 rc['reason'] = 'Test setup cleanup failed'
334 global_test_results["lutf-"+self.__parent_suite] = rc
337 logging.critical("Started test script: %s" % str(self.name))
338 start_time = datetime.datetime.now()
340 except Exception as e:
341 if preferences['halt_on_exception']:
344 # if the script went out of its way to say I want to halt all execution
346 if type(e) == LUTFError and e.halt:
349 rc = {'status': 'FAIL', 'error': str(e)}
351 logging.debug("Finished test script: %s" % str(self.name))
352 duration = datetime.datetime.now() - start_time
353 rc['duration'] = int(round(duration.total_seconds()))
354 if rc['status'] == 'FAIL' and self.is_expected_failure(name):
355 rc['status'] = 'EXPECTED FAILURE'
356 rc['return_code'] = 0
357 elif rc['status'] == 'FAIL':
358 rc['return_code'] = -22
360 rc['return_code'] = 0
362 global_test_results["lutf-"+self.__parent_suite] = rc
363 logging.debug("%s took %s to run" % (str(self.name), duration))
366 if clutf_global.get_lutf_mode() == clutf_global.EN_LUTF_RUN_INTERACTIVE:
367 print(name+"\t"+str(progress)+"%"+" "*30, end='\r')
369 print(name+"\t"+str(progress)+"%"+" "*30)
371 with open(me.get_test_progress_path(), 'a+') as f:
372 out = '== lutf-' + self.__parent_suite + ' test ' + \
373 name + ' ============ ' + str(progress) + "% complete\n"
376 if type(self.__callbacks) is TestSuiteCallbacks and \
377 'clean' in self.__callbacks and progress == 100:
379 self.__callbacks['clean']()
381 logging.critical("Failed to clean at end of suite:" + self.__parent_suite)
385 with open(self.__abs_path, 'r') as f:
387 print(line.strip('\n'))
391 preferences = common.global_pref
394 subprocess.call(preferences['editor']+" "+self.__abs_path, shell=True)
396 logging.critical("No editor available")
397 print("No editor available")
399 class TestCollection:
400 def __init__(self, base, name, callbacks, skip_list, expected_failures):
401 self.__suite_name = name
405 self.__abs_path = os.path.join(base, name)
406 self.__callbacks = callbacks
407 self.__skip_list = skip_list
408 self.__expected_failures = expected_failures
411 def __getitem__(self, key):
413 rc = self.__test_db[key]
415 raise LUTFError('no entry for ' + str(key))
422 # needed for python 3.x
424 if self.__n < self.__max:
425 key = list(self.__test_db.keys())[self.__n]
426 suite = self.__test_db[key]
432 def __generate_test_db(self, db):
433 # lutf/python/tests/suite_xxx has a list of tests
434 # make a dictionary of each of these. Each test script
435 # should start with "test_"
436 for subdir, dirs, files in os.walk(self.__abs_path):
439 if f.startswith('test_') and os.path.splitext(f)[1] == '.py':
440 # add any subidrectories to the sys path
441 if subdir != '.' and not added:
442 subdirectory = os.path.join(self.__abs_path, subdir)
443 if subdirectory not in sys.path:
444 sys.path.append(subdirectory)
446 name = os.path.splitext(f.replace('test_', ''))[0]
447 db[name] = Script(os.path.join(self.__abs_path, subdir, f), self.__callbacks, self.__suite_name, self)
449 self.__max = len(self.__test_db)
451 def in_expected_failures_list(self, name):
452 return name in self.__expected_failures
454 def __in_skip_list(self, name):
455 return name in self.__skip_list
459 self.__generate_test_db(self.__test_db)
461 def get_num_scripts(self, match='*'):
463 for key in sorted(self.__test_db.keys()):
464 if fnmatch.fnmatch(key, match) and not self.__in_skip_list(key):
468 # run all the scripts in this test suite
469 def run(self, match='*', num_scripts=0):
470 # get number of scripts
472 num_scripts = self.get_num_scripts(match)
476 with open(me.get_test_progress_path(), 'a+') as f:
477 out = '-----============= lutf-' + self.__suite_name.replace('suite_', '') + "\n"
481 for key in sorted(self.__test_db.keys()):
482 if fnmatch.fnmatch(key, match) and not self.__in_skip_list(key):
484 progress = int((executed / num_scripts) * 100)
485 self.__test_db[key].run(progress)
487 def create_docs(self, csvfile, match='*'):
488 for k, v in self.__test_db.items():
489 if fnmatch.fnmatch(k, match):
490 v.create_docs(csvfile)
493 return list(self.__test_db.keys())
495 def dump(self, match='*'):
496 scripts_dict = {'scripts': []}
497 for k, v in self.__test_db.items():
498 if fnmatch.fnmatch(k, match):
499 if self.in_expected_failures_list(k):
500 scripts_dict['scripts'].append(k+' (expected failure)')
501 elif self.__in_skip_list(k):
502 scripts_dict['scripts'].append(k+' (skip)')
504 scripts_dict['scripts'].append(k)
505 scripts_dict['scripts'].sort()
506 print(yaml.dump(scripts_dict, Dumper=LutfDumper, indent=2, sort_keys=True))
508 def get_suite_name(self):
509 return self.__suite_name
512 return len(self.__test_db)
514 def add(self, script):
515 default_script = os.path.join(clutf_global.get_lutf_path(), 'python', 'tests', 'sample.py')
516 if not os.path.isfile(default_script):
517 raise LUTFError("%s does not exist. Corrupted LUTF installation")
518 rc = shutil.copy(default_script,
519 os.path.join(self.__abs_path, script))
521 class TestSuiteCallbacks:
522 def __init__(self, **kwargs):
523 if type(kwargs) is not dict:
524 raise LUTFError("Must specify a dictionary")
525 self.__callbacks = kwargs
526 def __contains__(self, key):
527 return key in self.__callbacks
528 def __getitem__(self, key):
530 rc = self.__callbacks[key]
532 raise LUTFError('no entry for ' + str(key))
535 print(yaml.dump(self.__callbacks, Dumper=LutfDumper, indent=2, sort_keys=True))
538 def __init__(self, base, name):
540 self.__callback_reg = False
541 self.__callbacks = None
543 self.__abs_path = os.path.join(base, name)
545 self.__skip_list = []
546 self.__expected_failures = []
547 if self.__abs_path not in sys.path:
548 sys.path.append(self.__abs_path)
551 def __register_callbacks(self):
552 if self.__callback_reg:
554 # find callbacks module in this suite and get the callbacks
555 for subdir, dirs, files in os.walk(self.__abs_path):
558 if f == 'callbacks.py' and not self.__callback_reg:
559 mod_name = self.name+'.'+'callbacks'
560 module = __import__(mod_name)
561 importlib.reload(module)
563 ### TODO Add more test suite callbacks here
564 setup_clean_cb = getattr(module.callbacks, "lutf_clean_setup")
565 if hasattr(setup_clean_cb, '__call__'):
566 self.__callbacks = TestSuiteCallbacks(clean=setup_clean_cb)
567 except Exception as e:
568 logging.critical(str(e))
569 self.callback_reg = True
572 mod_name = self.name+'.'+'skip'
573 module = __import__(mod_name)
574 importlib.reload(module)
576 if type(module.skip.skip_list) != list:
577 logging.critical('malformed skip list')
580 self.__skip_list = module.skip.skip_list
584 self.__expected_failures = module.skip.expected_failures
587 except Exception as e:
588 logging.critical(str(e))
593 self.__callback_reg = False
594 self.__register_callbacks()
595 self.scripts = TestCollection(self.__base, self.name, self.__callbacks, self.__skip_list, self.__expected_failures)
597 def dump(self, match='*'):
598 self.scripts.dump(match)
601 return self.scripts.list()
603 def create_docs(self, csvfile, match='*'):
604 self.scripts.create_docs(csvfile, match)
606 def get_num_scripts(self, match='*'):
607 return self.scripts.get_num_scripts(match)
609 def run(self, match='*', num_scripts=0):
610 self.scripts.run(match=match, num_scripts=num_scripts)
612 def get_abs_path(self):
613 return self.__abs_path
615 def add(self, script):
616 new_name = 'test_'+os.path.splitext(script)[0]+'.py'
617 if os.path.isfile(new_name):
618 raise LUTFError("%s already exists" % (str(new_name)))
619 self.scripts.add(new_name)
624 This class stores all the available test suites.
625 The following methods are available for the suites:
626 list() - list all the suites
627 run() - run all the suites
628 dump() - YAML output of the suites available
629 create_docs() - create document for all suites
630 A single suite can be accessed as follows:
631 suites['name of suite']
632 A single suite provides the following methods:
633 list() - list all the scripts in the suite
634 run() - Run all the scripts in the suite
635 dump() - YAML output of the scripts available
636 create_docs() - create document for this suite
637 A single script can be accessed as follows:
638 suites['name of suite'].scripts['name of script']
639 A single script provides the following methods:
640 edit() - edit the script
641 show() - show the script
642 run() - run the script
645 # iterate over the test scripts directory and generate
646 # An internal database
650 self.__lutf_path = clutf_global.get_lutf_path()
651 if len(self.__lutf_path) == 0:
652 raise LUTFError('No LUTF path provided')
653 self.__lutf_tests = self.__lutf_path + '/python/tests/'
654 if not os.path.isdir(self.__lutf_tests):
655 raise LUTFError('No tests suites director: ' + sef.lutf_tests)
656 self.__generate_test_db(self.__test_db)
658 def __getitem__(self, key):
660 rc = self.__test_db[key]
662 raise LUTFError('no entry for ' + str(key))
669 # needed for python 3.x
671 if self.__n < self.__max:
672 key = list(self.__test_db.keys())[self.__n]
673 suite = self.__test_db[key]
679 def __generate_test_db(self, db):
680 # lutf/python/tests has a directory for each test suite
681 # make a dictionary of each of these. The lutf/python/tests
682 # is one level hierarchy. Each directory suite should start
684 for subdir, dirs, files in os.walk(self.__lutf_tests):
687 if d.startswith('suite_'):
688 name = d.replace('suite_', '')
689 db[name] = ATestSuite(self.__lutf_tests, d)
691 self.__max = len(self.__test_db)
693 def create_docs(self, csvfile, match='*'):
694 for k, v in self.__test_db.items():
695 if fnmatch.fnmatch(k, match):
696 v.create_docs(csvfile)
698 # run all the test suites
699 def run(self, suite_list='*', match='*'):
701 if suite_list == '*':
702 sl = list(self.__test_db.keys())
704 sl = [item for item in re.split(',| ', suite_list) if len(item.strip()) > 0]
706 for k, v in self.__test_db.items():
708 numscripts[k] = v.get_num_scripts('*')
710 for k, v in self.__test_db.items():
712 v.run(num_scripts=numscripts[k])
716 self.__generate_test_db(self.__test_db)
719 return len(self.__test_db)
722 return list(self.__test_db.keys())
724 def dump(self, match='*'):
725 suites_dict = {'suites': []}
726 for k, v in self.__test_db.items():
727 if fnmatch.fnmatch(k, match):
728 suites_dict['suites'].append(k)
729 suites_dict['suites'].sort()
730 print(yaml.dump(suites_dict, Dumper=LutfDumper, indent=2, sort_keys=True))
734 Class which represents this LUTF instance.
735 It allows extraction of:
736 - interfaces available
742 It provides an exit method to exit the LUTF instance
744 def __init__(self, name, telnet_port):
746 preferences = common.global_pref
748 self.__hostname = socket.gethostname()
749 self.__lutf_telnet_server = None
750 self.__lutf_telnet_port = telnet_port
751 self.__lutf_listen_port = clutf_global.get_master_port()
752 self.__lutf_type = clutf_global.get_lutf_type()
753 lscpu = lutf_exec_local_cmd('/usr/bin/lscpu')
754 self.__cpuinfo = yaml.load(lscpu[0].decode('utf-8'), Loader=yaml.FullLoader)
755 cfg_path = clutf_global.get_lutf_cfg_file_path()
757 raise LUTFError("No LUTF config file provided")
758 with open(cfg_path, 'r') as f:
759 self.lutf_cfg = yaml.load(f, Loader=yaml.FullLoader)
760 config_ifs_num = MIN_IFS_NUM_DEFAULT
761 logging.critical('CONFIGURATION CONTENT--->' + str(self.lutf_cfg))
762 if "num_intfs" in self.lutf_cfg['lutf']:
763 config_ifs_num = self.lutf_cfg['lutf']['num_intfs']
764 if "lutf-env-vars" in self.lutf_cfg['lutf']:
765 self.import_env_vars(self.lutf_cfg['lutf']['lutf-env-vars'])
766 if "lustre-path" in self.lutf_cfg['lutf']:
767 self.__lustre_base_path = os.path.split(self.lutf_cfg['lutf']['lustre-path'])[0]
768 set_lustre_base_path(self.__lustre_base_path)
770 self.__lustre_base_path = ''
771 self.alias_list = self.provision_intfs(config_ifs_num)
772 # delete any older test_progress files
773 if os.path.isfile(self.get_test_progress_path()) and self.__lutf_type == EN_LUTF_MASTER:
774 os.remove(self.get_test_progress_path())
776 def import_env_vars(self, fpath):
777 with open(fpath, 'r') as f:
778 for line in f.readlines():
779 if 'export ' in line:
780 s = line.replace('export ', '')
782 os.environ[kv[0].strip()] = kv[1].strip().strip('"')
784 def get_lustre_base_path(self):
785 return self.__lustre_base_path
787 def get_test_progress_path(self):
788 if 'test-progress' in self.lutf_cfg['lutf']:
789 path = self.lutf_cfg['lutf']['test-progress']
791 path = clutf_global.get_lutf_tmp_dir()
792 path = os.path.join(path, 'lutf_test_progress.out')
795 def get_local_interface_names(self):
796 return netifaces.interfaces()
798 def get_local_interface_ip(self, name):
799 return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['addr']
801 def get_local_interface_nm(self, name):
802 return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['netmask']
804 def get_local_interface_bc(self, name):
805 return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['broadcast']
811 if (len(self.alias_list) > 0):
812 for alias in self.alias_list:
813 del_ip_alias_cmd_str = "/usr/sbin/ip addr del " + alias
814 rc = lutf_exec_local_cmd(del_ip_alias_cmd_str)
816 if "ERROR" in ret_str:
817 error = "Uexpected result when deleting an alias ip: %s\n" % (ret_str)
819 print("Shutting down the LUTF")
822 def get_cpuinfo(self):
823 return self.__cpuinfo
825 def get_num_cpus(self):
826 return self.__cpuinfo['CPU(s)']
828 def get_num_numa_nodes(self):
829 return self.__cpuinfo['NUMA node(s)']
831 def list_intfs(self):
833 Return a list of all the interfaces available on this node
835 intfs = {'interfaces': {}}
836 for intf in self.get_local_interface_names():
838 intfs['interfaces'][intf] = {'ip': self.get_local_interface_ip(intf),
839 'netmask': self.get_local_interface_nm(intf),
840 'broadcast': self.get_local_interface_bc(intf)}
845 def dump_intfs(self):
847 Dump the interfaces in YAML format
849 print(yaml.dump(self.list_intfs(), sort_keys=False))
853 Return the symbolic name assigned to this LUTF instance
857 def my_hostname(self):
859 Return the hostname of this node
861 return self.__hostname
865 Return the type of this LUTF instance
867 if self.__lutf_type == EN_LUTF_MASTER:
869 elif self.__lutf_type == EN_LUTF_AGENT:
871 raise LUTFError("Undefined LUTF role: %d" % (self.__lutf_type))
873 def my_telnetport(self):
875 Return the telnet port of this LUTF instance
877 return self.__lutf_telnet_port
879 def my_listenport(self):
881 Return the listen port of this LUTF instance
883 return self.__lutf_listen_port
885 def handle_rpc_req(self, rpc_yaml):
891 #rpc_str = rpc_yaml.decode('utf-8')
892 y = yaml.load(rpc_yaml, Loader=yaml.FullLoader)
893 # check to see if this is for me
894 target = y['rpc']['dst']
895 if target != self.name:
896 logging.critical("RPC intended to %s but I am %s" % (target, self.name))
898 source = y['rpc']['src']
899 name = os.path.split(os.path.splitext(y['rpc']['script'])[0])[1]
900 path = os.path.split(os.path.splitext(y['rpc']['script'])[0])[0]
901 if path not in sys.path:
902 sys.path.append(path)
903 rpc_type = y['rpc']['type']
904 if rpc_type == 'function_call':
905 function_name = y['rpc']['function']
906 elif rpc_type == 'method_call':
907 class_name = y['rpc']['class']
908 method_name = y['rpc']['method']
909 class_id = y['rpc']['class_id']
910 elif rpc_type == 'instantiate_class' or rpc_type == 'destroy_class':
911 class_name = y['rpc']['class']
912 class_id = y['rpc']['class_id']
914 raise LUTFError('Unexpected rpc')
916 module = __import__(name)
917 importlib.reload(module)
918 args = y['rpc']['parameters']['args']
919 kwargs = y['rpc']['parameters']['kwargs']
920 lutf_exception_string = None
922 if rpc_type == 'function_call':
923 module_func = getattr(module, function_name)
924 if hasattr(module_func, '__call__'):
925 rc = module_func(*args, **kwargs)
926 elif rpc_type == 'instantiate_class':
927 my_class = getattr(module, class_name)
928 instance = my_class(*args, **kwargs)
929 common.add_to_class_db(instance, class_id)
930 elif rpc_type == 'destroy_class':
931 instance = common.get_class_from_db(class_id)
933 common.del_entry_from_class_db(class_id)
934 elif rpc_type == 'method_call':
935 instance = common.get_class_from_db(class_id)
936 if type(instance).__name__ != class_name:
937 raise LUTFError("requested class %s, but id refers to class %s" % (class_name, type(instance).__name__))
938 rc = getattr(instance, method_name)(*args, **kwargs)
939 except Exception as e:
940 if type(e) == LUTFError:
941 lutf_exception_string = e
943 exception_list = traceback.format_stack()
944 exception_list = exception_list[:-2]
945 exception_list.extend(traceback.format_tb(sys.exc_info()[2]))
946 exception_list.extend(traceback.format_exception_only(sys.exc_info()[0], sys.exc_info()[1]))
947 header = "Traceback (most recent call last):\n"
948 stacktrace = "".join(exception_list)
949 lutf_exception_string = header+stacktrace
950 if lutf_exception_string:
951 rc_yaml = populate_rpc_rsp(self.name, source, rc, lutf_exception_string)
953 rc_yaml = populate_rpc_rsp(self.name, source, rc)
954 lutf_send_rpc_rsp(source, yaml.dump(rc_yaml))
956 def provision_intfs(self, num_intf_req):
957 # if there are enough interfaces, don't need to add aliases
958 intfs_dict = self.list_intfs()
959 intfs = list(intfs_dict['interfaces'].keys())
960 num_available = len(intfs)
961 if num_available >= num_intf_req:
963 # add aliases for the last available interface
964 base_intf_name = intfs[num_available-1]
965 base_ip = netifaces.ifaddresses(base_intf_name)[netifaces.AF_INET][0]['addr']
966 base_ip_netmask = netifaces.ifaddresses(base_intf_name)[netifaces.AF_INET][0]['netmask']
967 base_ip_netmask_bits = sum(bin(int(x)).count('1') for x in base_ip_netmask.split('.'))
968 intf_ip_alias = base_ip
970 intf_ip_alias_split = intf_ip_alias.split(separator)
972 alias_param_list = []
973 for i in range(0, num_intf_req - num_available):
974 intf_name_alias = base_intf_name + ":" + str(i)
975 alias_confirmed = False
976 intf_ip_alias_candidate_split = intf_ip_alias_split[:]
978 # try to find available ip address
979 while ip_incr < 254 and not alias_confirmed:
980 # increment ip addr candidate
981 intf_ip_alias_candidate_split[3] = str((int(intf_ip_alias_split[3])+ip_incr)%255)
982 intf_ip_alias = separator.join(intf_ip_alias_candidate_split)
985 rc = lutf_exec_local_cmd("/usr/bin/ping -c 3 " + intf_ip_alias)
987 except Exception as e:
988 if "Host Unreachable" in str(e):
989 alias_confirmed = True
991 if not alias_confirmed:
992 error = "Failed to allocate ip address for alias if %s\n" % (intf_name_alias)
994 return alias_param_list
995 print("adding alias: ", intf_name_alias, " ip: ", intf_ip_alias)
996 # build the command string for adding the alias, back up for clean-up on exit
997 add_ip_alias_params = intf_ip_alias + "/" + str(base_ip_netmask_bits)
998 add_ip_alias_params += " brd + dev " + base_intf_name + " label " + intf_name_alias
999 add_ip_alias_cmd_str = "/usr/sbin/ip addr add " + add_ip_alias_params
1000 rc = lutf_exec_local_cmd(add_ip_alias_cmd_str)
1002 if "Error" in ret_str:
1003 error = "Uexpected result when creating an alias ip: %s\n" % (ret_str)
1004 logging.debug(error)
1005 return alias_param_list
1006 alias_param_list.append(add_ip_alias_params)
1008 return alias_param_list
1011 # Dump the global results to console or to file
1012 def dumpGlobalTestResults(fname=None, status=None, desc=None):
1014 Dump the YAML results for tests which ran so far
1016 global global_test_results
1018 results = global_test_results.get()
1022 # if this is path then use it as is, otherwise put it in the tmp dir
1023 if os.sep not in fname:
1024 fpath = os.path.join(clutf_global.get_lutf_tmp_dir(), fname)
1025 with open(fpath, 'w') as f:
1026 f.write(yaml.dump(results,
1027 Dumper=LutfDumper, indent=2,
1030 print(yaml.dump(results, Dumper=LutfDumper, indent=2, sort_keys=False))
1036 for p in LUTF_SCRIPT_PATHS:
1037 path = os.path.join(clutf_global.get_lutf_path(),p)
1038 if path not in sys.path:
1039 sys.path.append(path)
1040 lutf_tmp_dir = clutf_global.get_lutf_tmp_dir()
1041 Path(lutf_tmp_dir).mkdir(parents=True, exist_ok=True)
1043 logging.basicConfig(filename=os.path.join(clutf_global.get_lutf_tmp_dir(), "lutf_py.log"),
1047 # All test results are stored in here
1048 # Accessor functions can be used to dump it.
1049 global_test_results = YamlGlobalTestResults()
1051 suites = TestSuites()
1053 agents = lutf_agent.LutfAgents()
1055 logging.critical("INSTANTIATING myself")
1056 me = Myself(clutf_global.get_node_name(),
1057 clutf_global.get_agent_telnet_port())
1059 # Convenience Variables
1060 R = dumpGlobalTestResults
1065 preferences = load_pref()
1067 set_logging_level('debug')