From ba1fa08a0fd13a82c86db04d0e1b15c32126ba2f Mon Sep 17 00:00:00 2001 From: Amir Shehata Date: Tue, 24 Mar 2020 19:23:43 -0700 Subject: [PATCH] LU-10973 lnet: LUTF Python infra Added the python LUTF infrastructure. The python infrastructure provides the core LUTF feature set. The tests-infra is lnet specific infrastructure to be used by LUTF test suites. Test-Parameters: trivial Signed-off-by: Amir Shehata Change-Id: I1d0336606625424880f1b64b1dd296d4c7ed85ea Signed-off-by: Serguei Smirnov Reviewed-on: https://review.whamcloud.com/38087 Tested-by: jenkins Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre-dkms.spec.in | 4 + lustre.spec.in | 4 + lustre/tests/Makefile.am | 3 + lustre/tests/auster | 8 + lustre/tests/lutf.sh | 85 ++ lustre/tests/lutf/Makefile.am | 8 +- lustre/tests/lutf/python/config/lutf_start.py | 279 +++++ lustre/tests/lutf/python/config/lutf_start.sh | 29 + lustre/tests/lutf/python/infra/lutf.py | 1068 ++++++++++++++++++++ lustre/tests/lutf/python/infra/lutf_agent.py | 137 +++ lustre/tests/lutf/python/infra/lutf_basetest.py | 94 ++ lustre/tests/lutf/python/infra/lutf_cmd.py | 51 + lustre/tests/lutf/python/infra/lutf_common_def.py | 226 +++++ lustre/tests/lutf/python/infra/lutf_exception.py | 71 ++ lustre/tests/lutf/python/infra/lutf_file.py | 146 +++ .../tests/lutf/python/infra/lutf_get_num_agents.py | 10 + lustre/tests/lutf/python/infra/lutf_paramiko.py | 47 + lustre/tests/lutf/python/infra/lutf_telnet_cl.py | 169 ++++ lustre/tests/lutf/python/infra/lutf_telnet_sr.py | 164 +++ lustre/tests/lutf/python/infra/lutf_utils.py | 27 + lustre/tests/lutf/python/infra/telnetsrvlib.py | 779 ++++++++++++++ .../tests/lutf/python/tests-infra/clustre_reset.py | 6 + lustre/tests/lutf/python/tests-infra/fio.py | 157 +++ lustre/tests/lutf/python/tests-infra/lnet.py | 816 +++++++++++++++ .../tests/lutf/python/tests-infra/lnet_cleanup.py | 45 + .../tests/lutf/python/tests-infra/lnet_helpers.py | 731 ++++++++++++++ .../tests/lutf/python/tests-infra/lnet_selftest.py | 57 ++ .../lutf/python/tests-infra/lustre_cleanup.py | 111 ++ lustre/tests/lutf/python/tests-infra/lustre_fs.py | 188 ++++ .../tests/lutf/python/tests-infra/lustre_logs.py | 141 +++ .../tests/lutf/python/tests-infra/lustre_node.py | 269 +++++ .../tests/lutf/python/tests-infra/lustre_roles.py | 11 + .../lutf/python/tests-infra/selftest_template.py | 34 + .../tests/lutf/python/tests-infra/utility_paths.py | 202 ++++ lustre/tests/lutf/python/tests/sample.py | 19 + .../swig_templates/generate_lnetconfig_swig_i.py | 0 .../lutf/swig_templates/generate_lutf_swig_i.py | 0 37 files changed, 6195 insertions(+), 1 deletion(-) create mode 100644 lustre/tests/lutf.sh create mode 100644 lustre/tests/lutf/python/config/lutf_start.py create mode 100644 lustre/tests/lutf/python/config/lutf_start.sh create mode 100644 lustre/tests/lutf/python/infra/lutf.py create mode 100644 lustre/tests/lutf/python/infra/lutf_agent.py create mode 100644 lustre/tests/lutf/python/infra/lutf_basetest.py create mode 100644 lustre/tests/lutf/python/infra/lutf_cmd.py create mode 100644 lustre/tests/lutf/python/infra/lutf_common_def.py create mode 100644 lustre/tests/lutf/python/infra/lutf_exception.py create mode 100644 lustre/tests/lutf/python/infra/lutf_file.py create mode 100644 lustre/tests/lutf/python/infra/lutf_get_num_agents.py create mode 100644 lustre/tests/lutf/python/infra/lutf_paramiko.py create mode 100644 lustre/tests/lutf/python/infra/lutf_telnet_cl.py create mode 100644 lustre/tests/lutf/python/infra/lutf_telnet_sr.py create mode 100644 lustre/tests/lutf/python/infra/lutf_utils.py create mode 100644 lustre/tests/lutf/python/infra/telnetsrvlib.py create mode 100644 lustre/tests/lutf/python/tests-infra/clustre_reset.py create mode 100644 lustre/tests/lutf/python/tests-infra/fio.py create mode 100644 lustre/tests/lutf/python/tests-infra/lnet.py create mode 100644 lustre/tests/lutf/python/tests-infra/lnet_cleanup.py create mode 100644 lustre/tests/lutf/python/tests-infra/lnet_helpers.py create mode 100644 lustre/tests/lutf/python/tests-infra/lnet_selftest.py create mode 100644 lustre/tests/lutf/python/tests-infra/lustre_cleanup.py create mode 100644 lustre/tests/lutf/python/tests-infra/lustre_fs.py create mode 100644 lustre/tests/lutf/python/tests-infra/lustre_logs.py create mode 100644 lustre/tests/lutf/python/tests-infra/lustre_node.py create mode 100644 lustre/tests/lutf/python/tests-infra/lustre_roles.py create mode 100644 lustre/tests/lutf/python/tests-infra/selftest_template.py create mode 100644 lustre/tests/lutf/python/tests-infra/utility_paths.py create mode 100644 lustre/tests/lutf/python/tests/sample.py mode change 100755 => 100644 lustre/tests/lutf/swig_templates/generate_lnetconfig_swig_i.py mode change 100755 => 100644 lustre/tests/lutf/swig_templates/generate_lutf_swig_i.py diff --git a/lustre-dkms.spec.in b/lustre-dkms.spec.in index 26dd619..9ced9e6 100644 --- a/lustre-dkms.spec.in +++ b/lustre-dkms.spec.in @@ -2,6 +2,10 @@ %bcond_without zfs %bcond_with ldiskfs +# LUTF Turn off brp-python-precompile script as we don't want the python files +# to be compiled on installation +%global __os_install_post %(echo '%{__os_install_post}' | sed -e 's!/usr/lib[^[:space:]]*/brp-python-bytecompile[[:space:]].*$!!g') + # Set the package name prefix %if %{with servers} %if %{with zfs} diff --git a/lustre.spec.in b/lustre.spec.in index 453ad94..0609232 100644 --- a/lustre.spec.in +++ b/lustre.spec.in @@ -19,6 +19,10 @@ %bcond_without mpi %bcond_with kabi +# LUTF Turn off brp-python-precompile script as we don't want the python files +# to be compiled on installation +%global __os_install_post %(echo '%{__os_install_post}' | sed -e 's!/usr/lib[^[:space:]]*/brp-python-bytecompile[[:space:]].*$!!g') + # By default both gss and gss keyring are disabled. # gss keyring requires the gss core. If the builder # request gss_keyring we must enable gss core even if diff --git a/lustre/tests/Makefile.am b/lustre/tests/Makefile.am index 110e0f8..45d51ce 100644 --- a/lustre/tests/Makefile.am +++ b/lustre/tests/Makefile.am @@ -30,6 +30,9 @@ noinst_SCRIPTS += recovery-mds-scale.sh run_dd.sh run_tar.sh run_iozone.sh noinst_SCRIPTS += run_dbench.sh run_IOR.sh recovery-double-scale.sh noinst_SCRIPTS += recovery-random-scale.sh parallel-scale.sh metadata-updates.sh noinst_SCRIPTS += lustre-rsync-test.sh ost-pools.sh rpc.sh yaml.sh +if BUILD_LUTF +noinst_SCRIPTS += lutf.sh +endif # BUILD_LUTF noinst_SCRIPTS += lnet-selftest.sh obdfilter-survey.sh mmp.sh mmp_mark.sh noinst_SCRIPTS += sgpdd-survey.sh maloo_upload.sh auster setup-nfs.sh noinst_SCRIPTS += mds-survey.sh parallel-scale-nfs.sh large-lun.sh diff --git a/lustre/tests/auster b/lustre/tests/auster index fc09ae0..7e2d61c 100755 --- a/lustre/tests/auster +++ b/lustre/tests/auster @@ -272,6 +272,14 @@ run_suites() { shift; export ONLY=$(split_commas $1) opts+="ONLY=$ONLY ";; + --suite) + shift; + export SUITE=$(split_commas $1) + opts+="SUITE=$SUITE ";; + --pattern) + shift; + export PATTERN=$(split_commas $1) + opts+="PATTERN=$PATTERN ";; --except) shift; export EXCEPT=$(split_commas $1) diff --git a/lustre/tests/lutf.sh b/lustre/tests/lutf.sh new file mode 100644 index 0000000..f512e9e --- /dev/null +++ b/lustre/tests/lutf.sh @@ -0,0 +1,85 @@ +# Run select tests by setting ONLY, or as arguments to the script. +# Skip specific tests by setting EXCEPT. +# + +export ONLY=${ONLY:-"$*"} +export SUITE=${SUITE:-"$*"} +export PATTERN=${PATTERN:-"$*"} + +[ "$SLOW" = "no" ] && EXCEPT_SLOW="" +# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! + +LUSTRE=${LUSTRE:-$(dirname $0)/..} + +. $LUSTRE/tests/test-framework.sh +init_test_env $@ +. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh} +init_logging + +# bug number for skipped test: +ALWAYS_EXCEPT="$SANITY_LNET_EXCEPT " + +export LNETCTL=${LNETCTL:-"$LUSTRE/../lnet/utils/lnetctl"} +[ ! -f "$LNETCTL" ] && + export LNETCTL=$(which lnetctl 2> /dev/null) +[[ -z $LNETCTL ]] && skip "Need lnetctl" + +restore_mounts=false +if is_mounted $MOUNT || is_mounted $MOUNT2; then + cleanupall || error "Failed cleanup prior to test execution" + restore_mounts=true +fi + +cleanup_lnet() { + echo "Cleaning up LNet" + lsmod | grep -q lnet && + $LNETCTL lnet unconfigure 2>/dev/null + unload_modules +} + +restore_modules=false +if module_loaded lnet ; then + cleanup_lnet || error "Failed to unload modules before test execution" + restore_modules=true +fi + +cleanup_testsuite() { + trap "" EXIT + cleanup_lnet + if $restore_mounts; then + setupall || error "Failed to setup Lustre after test execution" + elif $restore_modules; then + load_modules || + error "Couldn't load modules after test execution" + fi + return 0 +} + +set_env_vars_on_remote() { + local list=$(comma_list $(all_nodes)) + do_rpc_nodes "$list" "echo $PATH; echo $LUSTRE; echo $LNETCTL; echo $LCTL" +} + +set_env_vars_on_remote + +rm /tmp/tf.skip + +# do not exit if LUTF fails +set +e + +echo "+++++++++++STARTING LUTF" +. $LUSTRE/tests/lutf/python/config/lutf_start.sh $CONFIG +rc=$? +echo "-----------STOPPING LUTF: $rc" + +if [ -d /tmp/lutf/ ]; then + tar -czf /tmp/lutf.tar.gz /tmp/lutf + mv /tmp/lutf.tar.gz $LOGDIR +fi + + +complete $SECONDS + +cleanup_testsuite + +exit $rc diff --git a/lustre/tests/lutf/Makefile.am b/lustre/tests/lutf/Makefile.am index 1485644..dae3c88 100644 --- a/lustre/tests/lutf/Makefile.am +++ b/lustre/tests/lutf/Makefile.am @@ -11,7 +11,13 @@ nobase_noinst_DATA += src/lutf_global.swg nobase_noinst_DATA += src/liblutf_connect.c nobase_noinst_DATA += src/liblutf_agent.c nobase_noinst_DATA += src/liblutf_global.c -EXTRA_DIST=$(nobase_noinst_DATA) +nobase_noinst_SCRIPTS = $(wildcard python/config/*.py) +nobase_noinst_SCRIPTS += $(wildcard python/config/lutf_start.sh) +nobase_noinst_SCRIPTS += $(wildcard python/infra/*.py) +nobase_noinst_SCRIPTS += $(wildcard python/tests/*.py) +nobase_noinst_SCRIPTS += $(wildcard python/tests-infra/*.py) +EXTRA_DIST=$(nobase_noinst_DATA) $(nobase_noinst_SCRIPTS) noinst_testdir = $(libdir)/lustre/tests/lutf nobase_noinst_test_DATA = $(nobase_noinst_DATA) +nobase_noinst_test_SCRIPTS = $(nobase_noinst_SCRIPTS) diff --git a/lustre/tests/lutf/python/config/lutf_start.py b/lustre/tests/lutf/python/config/lutf_start.py new file mode 100644 index 0000000..5327296 --- /dev/null +++ b/lustre/tests/lutf/python/config/lutf_start.py @@ -0,0 +1,279 @@ +""" +lutf_start.py is a script intended to be run from the lutf.sh +It relies on a set of environment variables to be set. If they +are not set the script will exit: + +*_HOST: nodes to run the LUTF on. They must be unique +ONLY: A script to run +SUITE: A suite to run +LUTF_SHELL: If specified it'll run the python interpreter +MASTER_PORT: The port on which the master will listen +TELNET_PORT: The port on which a telnet session can be established to the agent +LUSTRE: The path to the lustre tests directory +PYTHONPATH: The path where the python scripts and libraries are located +LUTFPATH: Path to the lutf directory + +Purpose: +-------- +start an instance of the LUTF on the master and the agents +""" + +import os, re, yaml, paramiko, copy +import shlex, subprocess, time +from pathlib import Path +from lutf_exception import LutfDumper +from lutf_paramiko import lutf_exec_remote_cmd, lutf_put_file, lutf_get_file + +cfg_yaml = {'lutf': {'shell': True, 'agent': False, 'telnet-port': -1, + 'master-address': None, 'master-port': -1, 'node-name': None, + 'master-name': None, 'lutf-path': None, 'py-path': None, 'lustre-path': None, + 'suite': None, 'suite-list': None, 'script': None, 'pattern': None, + 'agent-list': None, 'results': None, 'always_except': None, + 'num_intfs': 3, 'lutf-env-vars': None, 'test-progress': None, + 'tmp-dir': '/tmp/lutf/'}} + +class LUTF: + def __init__(self): + self.nodes = {} + self.__collect_nodes() + + def __collect_nodes(self): + for k in os.environ: + if '_HOST' in k or 'CLIENTS' in k: + hosts = re.split(',| ', os.environ[k]) + hosts = [x for x in hosts if x] + i = 0 + for h in hosts: + if len(h) > 0: + if h in list(self.nodes.values()): + print("Duplicate host: %s. LUTF expects unique hosts: %s. Skipping" \ + % (h, str(list(self.nodes.values())))) + continue + if i > 0: + self.nodes[k+str(i)] = h + else: + self.nodes[k] = h + i += 1 + + def __exec_local_cmd(self, cmd): + args = shlex.split(cmd) + out = subprocess.Popen(args, stderr=subprocess.STDOUT, + stdout=subprocess.PIPE) + t = out.communicate()[0],out.returncode + print(cmd+"\n"+"rc = "+str(t[1])+" "+t[0].decode("utf-8")) + return int(t[1]) + + def __stop_lutf(self, key, host): + # make sure you kill any other instances of the LUTF + if host != os.environ['HOSTNAME']: + lutf_exec_remote_cmd("pkill -9 lutf", host) + + def __install_deps_on_hosts(self, host): + try: + installbin = os.environ['INSTALLBIN'] + except: + installbin = 'yum' + try: + pip = os.environ['PIPBIN'] + except: + pip = 'pip3' + print("%s: %s install -y python3" % (host, installbin)) + lutf_exec_remote_cmd(installbin+" install -y python3", host, ignore_err=True) + print("%s: %s install paramiko" % (host, pip)) + lutf_exec_remote_cmd(pip+" install paramiko", host, ignore_err=True) + print("%s: %s install netifaces" % (host, pip)) + lutf_exec_remote_cmd(pip+" install netifaces", host, ignore_err=True) + print("%s: %s install pyyaml" % (host, pip)) + lutf_exec_remote_cmd(pip+" install pyyaml", host, ignore_err=True) + print("%s: %s install psutil" % (host, pip)) + lutf_exec_remote_cmd(pip+" install psutil", host, ignore_err=True) + + def __start_lutf(self, key, host, mname, master, agent_list=[], agent=True): + cfg = copy.deepcopy(cfg_yaml) + shell = 'batch' + try: + shell = os.environ['LUTF_SHELL'] + except: + pass + # agent will always run in daemon mode + if (agent): + cfg['lutf']['shell'] = 'daemon' + else: + cfg['lutf']['shell'] = shell + cfg['lutf']['agent'] = agent + cfg['lutf']['telnet-port'] = int(os.environ['TELNET_PORT']) + cfg['lutf']['master-address'] = master + cfg['lutf']['master-port'] = int(os.environ['MASTER_PORT']) + cfg['lutf']['node-name'] = key + cfg['lutf']['master-name'] = mname + cfg['lutf']['lutf-path'] = os.environ['LUTFPATH'] + cfg['lutf']['py-path'] = os.environ['PYTHONPATH'] + try: + sl = re.split(',| ', os.environ['SUITE']) + if len(sl) == 0: + raise ValueError + if len(sl) == 1: + cfg['lutf']['suite'] = os.environ['SUITE'] + del(cfg['lutf']['suite-list']) + else: + cfg['lutf']['suite-list'] = os.environ['SUITE'] + del(cfg['lutf']['suite']) + except: + if 'suite' in cfg['lutf']: + del(cfg['lutf']['suite']) + try: + cfg['lutf']['lustre-path'] = os.environ['LUSTRE'] + except: + if 'lustre-path' in cfg['lutf']: + del(cfg['lutf']['lustre-path']) + try: + cfg['lutf']['script'] = os.environ['ONLY'] + except: + if 'script' in cfg['lutf']: + del(cfg['lutf']['script']) + try: + cfg['lutf']['pattern'] = os.environ['PATTERN'] + except: + if 'pattern' in cfg['lutf']: + del(cfg['lutf']['pattern']) + try: + cfg['lutf']['results'] = os.environ['RESULTS'] + except: + if 'results' in cfg['lutf']: + del(cfg['lutf']['results']) + try: + cfg['lutf']['always_except'] = os.environ['ALWAYS_EXCEPT'] + except: + if 'always_except' in cfg['lutf']: + del(cfg['lutf']['always_except']) + try: + cfg['lutf']['num_intfs'] = os.environ['NUM_INTFS'] + except: + if 'num_intfs' in cfg['lutf']: + del(cfg['lutf']['num_intfs']) + try: + cfg['lutf']['lutf-env-vars'] = os.environ['LUTF_ENV_VARS'] + except: + if 'lutf-env-vars' in cfg['lutf']: + del(cfg['lutf']['lutf-env-vars']) + try: + cfg['lutf']['test-progress'] = os.environ['LUTF_TEST_PROGRESS'] + except: + if 'test-progress' in cfg['lutf']: + del(cfg['lutf']['test-progress']) + try: + cfg['lutf']['tmp-dir'] = os.environ['LUTF_TMP_DIR'] + except: + pass + try: + cfg['lutf']['tmp-dir'] = os.environ['LUTF_TMP_DIR'] + except: + pass + + if len(agent_list) > 0: + cfg['lutf']['agent-list'] = agent_list + else: + if 'agent-list' in cfg['lutf']: + del(cfg['lutf']['agent-list']) + + lutf_bin = 'lutf' + cfg_name = host+'.yaml' + lutf_cfg_path = os.path.join(cfg['lutf']['tmp-dir'], 'config') + Path(lutf_cfg_path).mkdir(parents=True, exist_ok=True) + lutf_cfg = os.path.join(lutf_cfg_path, cfg_name) + # write the config file + with open(lutf_cfg, 'w') as f: + f.write(yaml.dump(cfg, Dumper=LutfDumper, indent=4)) + # copy it over to the remote + if host != os.environ['HOSTNAME']: + lutf_exec_remote_cmd("mkdir -p " + lutf_cfg_path, host) + lutf_put_file(host, lutf_cfg, lutf_cfg) + + cmd = '' + + #setup the library path on the remote node + if 'LD_LIBRARY_PATH' in os.environ and host != os.environ['HOSTNAME']: + cmd = "LD_LIBRARY_PATH="+os.environ['LD_LIBRARY_PATH'] + + if 'PATH' in os.environ and host != os.environ['HOSTNAME']: + cmd += " PATH="+os.environ['PATH'] + + # start the LUTF on the remote + if cmd: + cmd += ' ' + cmd += lutf_bin+" --config "+lutf_cfg + + # make sure you kill any other instances of the LUTF + rc = 0 + if host != os.environ['HOSTNAME']: + lutf_exec_remote_cmd("pkill -9 lutf", host) + time.sleep(1) + print("%s: %s" % (host, cmd)) + try: + lutf_exec_remote_cmd(cmd, host) + except Exception as e: + print(e) + rc = -22 + else: + time.sleep(1) + print("%s: %s" % (host, cmd)) + try: + rc = self.__exec_local_cmd(cmd) + except Exception as e: + print(e) + rc = -22 + + return cfg, rc + + def __collect_lutf_logs(self, host): + if host != os.environ['HOSTNAME']: + rfname = "lutf."+host+".tar.gz" + rfpath = os.path.join(os.sep, 'tmp', rfname) + rtardir = os.path.join('tmp', 'lutf') + cmd = "tar -czf "+rfpath+" -C "+os.sep+" "+rtardir + lutf_exec_remote_cmd(cmd, host); + lutf_get_file(host, rfpath, os.path.join(os.sep, 'tmp', 'lutf', rfname)) + + def run(self): + master = '' + mname = '' + + if not os.environ['HOSTNAME'] in list(self.nodes.values()): + for k in list(self.nodes.keys()): + i = 1 + if 'CLIENTS' in k: + i += 1 + key = "CLIENTS"+str(i) + self.nodes[key] = os.environ['HOSTNAME'] + + agent_list = list(self.nodes.keys()) + if len(agent_list) == 0: + raise ValueError("no LUTF nodes defined") + + master = os.environ['HOSTNAME'] + mname = agent_list[list(self.nodes.values()).index(os.environ['HOSTNAME'])] + + rc = 0 + for k, v in self.nodes.items(): + if v != master: + cfg, rc = self.__start_lutf(k, v, mname, master) + + if rc == 0: + # run master locally + agent_list.remove(mname) + master_cfg, rc = self.__start_lutf(mname, master, mname, master, agent_list=agent_list, agent=False) + if master_cfg['lutf']['shell'] == 'batch': + for k, v in self.nodes.items(): + self.__stop_lutf(k, v) + + # collect all the logs + for k, v in self.nodes.items(): + if v != master: + self.__collect_lutf_logs(v); + + return rc + +if __name__ == '__main__': + lutf = LUTF() + rc = lutf.run() + exit(rc) diff --git a/lustre/tests/lutf/python/config/lutf_start.sh b/lustre/tests/lutf/python/config/lutf_start.sh new file mode 100644 index 0000000..88a6c4a --- /dev/null +++ b/lustre/tests/lutf/python/config/lutf_start.sh @@ -0,0 +1,29 @@ +# note the cfg/.sh should export all environment variables +# required. EX: export ost4_HOST=lustre01 + +export PYTHONPATH=$PYTHONPATH:$LUSTRE/tests/lutf/:$LUSTRE/tests/lutf/src/:$LUSTRE/tests/lutf/python:$LUSTRE/tests/lutf/python/tests/:$LUSTRE/tests/lutf/python/config/:$LUSTRE/tests/lutf/python/deploy:$LUSTRE/tests/lutf/python/infra + +export LUTFPATH=$LUSTRE/tests/lutf/ + +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LUTFPATH:$LUTFPATH/src:$LUSTRE/lnet/utils/lnetconfig/.libs/ + +export PATH=$PATH:$LUSTRE/tests/lutf/src:$LUSTRE/tests/lutf + +if [[ -z "${TELNET_PORT}" ]]; then + export TELNET_PORT=8181 +fi + +if [[ -z "${MASTER_PORT}" ]]; then + export MASTER_PORT=8282 +fi + +if [[ -z "${LUTF_SHELL}" ]]; then + export LUTF_SHELL=batch +fi + +if [[ -z "${PYTHONBIN}" ]]; then + export PYTHONBIN=python3 +fi + +export LUTF_ENV_VARS=$1 +$PYTHONBIN $LUSTRE/tests/lutf/python/config/lutf_start.py diff --git a/lustre/tests/lutf/python/infra/lutf.py b/lustre/tests/lutf/python/infra/lutf.py new file mode 100644 index 0000000..0077b98 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf.py @@ -0,0 +1,1068 @@ +from pathlib import Path +from clutf_agent import * +from lutf_common_def import * +import lutf_common_def as common +from lutf_exception import LUTFError, LutfDumper +from lutf_cmd import lutf_exec_local_cmd +import importlib, socket +import clutf_global +import lutf_agent +import netifaces +import os, subprocess, sys, yaml, fnmatch, logging, csv +import shutil, traceback, datetime, re, copy + +preferences = {} +lutf_tmp_dir = '' + +class LutfYaml: + def __init__(self, y=None): + if y is not None and (type(y) is not dict and type(y) is not list): + raise LUTFError('This class takes dictionaries or lists only') + self.__yaml = y + + def get(self): + return self.__yaml + + def dump(self): + return yaml.dump(self.__yaml) + + def load(self, stream): + if self.__yaml: + raise LUTFError('There exists a YAML instance') + self.__yaml = yaml.load(stream, Loader=yaml.FullLoader) + + def unload(self): + self.__yaml = None + +class YamlResults: + def __init__(self): + self.__results = [] + self.__max = 0 + self.__n = 0 + + def __setitem__(self, key, value): + for i, e in enumerate(self.__results): + if e.get()['name'] == key: + value['name'] = key + self.__results[i] = LutfYaml(value) + return + value['name'] = key + self.__results.append(LutfYaml(value)) + self.__max = len(self.__results) + + def __getitem__(self, key): + for entry in self.__results: + if entry.get()['name'] == key: + return entry + return None + + def __iter__(self): + self.__n = 0 + return self + + # needed for python 3.x + def __next__(self): + if self.__n < self.__max: + rc = self.__results[self.__n] + self.__n += 1 + return rc['name'], rc.get() + else: + raise StopIteration + + def get(self, status=None): + shadow = [] + for entry in self.__results: + e = entry.get() + if status and type(status) == str: + if e['status'] != status.upper(): + continue + shadow.append(entry.get()) + return shadow + +# subtest_result = YamlResults +# global_test_resutls['lutf-dlc']['script-name'] = rc +class YamlGlobalTestResults: + def __init__(self, desc=None): + if not desc: + self.desc = 'auster lutf' + self.__results = {'Tests': []} + self.__max = 0 + self.__n = 0 + + def __setitem__(self, key, value): + if type(value) != dict: + raise TypeError("This class only takes dict type") + for i, e in enumerate(self.__results['Tests']): + if e['name'] == key: + self.__results['Tests'][i]['SubTests'][value['name']] = value + self.finalize(key) + return + lutf = {'name': key, 'description': self.desc, 'SubTests': YamlResults()} + lutf['SubTests'][value['name']] = value + self.__results['Tests'].append(lutf) + self.__max = len(self.__results['Tests']) + self.finalize(key) + + def __getitem__(self, key): + for entry in self.__results['Tests']: + if entry['name'] == key: + return entry['SubTests'] + return None + + def __iter__(self): + self.__n = 0 + return self + + # needed for python 3.x + def __next__(self): + if self.__n < self.__max: + rc = self.__results['Tests'][self.__n] + self.__n += 1 + return rc['name'], rc + else: + raise StopIteration + + def finalize(self, name): + timefmt = datetime.datetime.utcnow().strftime('%a %b %d %H:%M:%S UTC %Y') + for e in self.__results['Tests']: + if e['name'] == name: + total_duration = 0 + sstatus = 'PASS' + subs = e['SubTests'].get() + for r in subs: + total_duration += r['duration'] + if r['status'] == 'FAIL': + sstatus = 'FAIL' + e['duration'] = total_duration + # TODO: Pass the LUTF for now until we clean up the tests + sstatus = 'PASS' + e['status'] = sstatus + e['submission'] = timefmt + + def get(self): + rc = copy.deepcopy(self.__results) + for t in rc['Tests']: + t['SubTests'] = t['SubTests'].get() + return rc + +class Documentation: + def __init__(self, base_name): + doc_path = os.path.join(clutf_global.get_lutf_path(), 'documentation') + Path(doc_path).mkdir(parents=True, exist_ok=True) + self.__req = os.path.join(clutf_global.get_lutf_path(), 'documentation', + os.path.splitext(base_name)[0]+'_req.csv') + self.__hld = os.path.join(clutf_global.get_lutf_path(), 'documentation', + os.path.splitext(base_name)[0]+'_hld.csv') + self.__tp = os.path.join(clutf_global.get_lutf_path(), 'documentation', + os.path.splitext(base_name)[0]+'_tp.csv') + self.__req_writeheader() + self.__hld_writeheader() + self.__tp_writeheader() + + def __req_writeheader(self): + if not os.path.isfile(self.__req): + header = ["Test Case ID", "Requirement Id", "Requirement Description"] + with open(self.__req, 'w') as fcsv: + writer = csv.writer(fcsv) + writer.writerow(header) + + def __hld_writeheader(self): + if not os.path.isfile(self.__req): + header = ["Test Case ID", "Requirement Id", "Design Notes"] + with open(self.__hld, 'w') as fcsv: + writer = csv.writer(fcsv) + writer.writerow(header) + + def __tp_writeheader(self): + if not os.path.isfile(self.__req): + header = ["Test Case ID", "Primary Requirement Id", "Secondary Requirement Id", "Test Case"] + with open(self.__tp, 'w') as fcsv: + writer = csv.writer(fcsv) + writer.writerow(header) + + def req_writerow(self, req_id, req_desc, fname): + with open(self.__req, 'a+') as fcsv: + writer = csv.writer(fcsv) + writer.writerow([fname, req_id, req_desc]) + + def hld_writerow(self, req_id, design, fname): + with open(self.__hld, 'a+') as fcsv: + writer = csv.writer(fcsv) + writer.writerow([fname, req_id, design]) + + def tp_writerow(self, preq_id, sreq_id, tc, fname): + with open(self.__tp, 'a+') as fcsv: + writer = csv.writer(fcsv) + writer.writerow([fname, preq_id, sreq_id, tc]) + +class Script: + def __init__(self, abs_path, callbacks, parent_suite, collection): + self.name = os.path.splitext(os.path.split(abs_path)[1])[0] + self.__abs_path = abs_path + self.__callbacks = callbacks + self.__parent_suite = parent_suite.replace('suite_', '') + self.__collection = collection + + def is_expected_failure(self, name): + return self.__collection.in_expected_failures_list(name) + + def create_docs(self, csvfile): + # open script and extract comment block. It is expected to + # be at the beginning of the file + doc = [] + start = False + with open(self.__abs_path, 'r') as f: + lines = f.readlines() + for l in lines: + if len(l.strip()) > 0 and l.strip() == '"""': + if start: + start = False + break + else: + start = True + elif start: + doc.append(l.strip()) + if len(doc) == 0: + return + + meta = {'prim': {'txt': [], 'st': False}, + 'primd': {'txt': [], 'st': False}, + 'sec': {'txt': [], 'st': False}, + 'des': {'txt': [], 'st': False}, + 'tc': {'txt': [], 'st': False}} + + for l in doc: + if '@PRIMARY:' in l: + meta['prim']['st'] = True + meta['primd']['st'] = False + meta['sec']['st'] = False + meta['des']['st'] = False + meta['tc']['st'] = False + meta['prim']['txt'].append(l.split('@PRIMARY:')[1].strip()) + elif '@PRIMARY_DESC:' in l: + meta['prim']['st'] = False + meta['primd']['st'] = True + meta['sec']['st'] = False + meta['des']['st'] = False + meta['tc']['st'] = False + meta['primd']['txt'].append(l.split('@PRIMARY_DESC:')[1].strip()) + elif '@SECONDARY:' in l: + meta['prim']['st'] = False + meta['primd']['st'] = False + meta['sec']['st'] = True + meta['des']['st'] = False + meta['tc']['st'] = False + meta['sec']['txt'].append(l.split('@SECONDARY:')[1].strip()) + elif '@DESIGN:' in l: + meta['prim']['st'] = False + meta['primd']['st'] = False + meta['sec']['st'] = False + meta['des']['st'] = True + meta['tc']['st'] = False + meta['des']['txt'].append(l.split('@DESIGN:')[1].strip()) + elif '@TESTCASE:' in l: + meta['prim']['st'] = False + meta['primd']['st'] = False + meta['sec']['st'] = False + meta['des']['st'] = False + meta['tc']['st'] = True + meta['tc']['txt'].append(l.split('@TESTCASE:')[1].strip()) + elif meta['prim']['st']: + meta['prim']['txt'].append('\n'+l) + elif meta['primd']['st']: + meta['primd']['txt'].append('\n'+l) + elif meta['sec']['st']: + meta['sec']['txt'].append('\n'+l) + elif meta['des']['st']: + meta['des']['txt'].append('\n'+l) + elif meta['tc']['st']: + meta['tc']['txt'].append('\n'+l) + + documentation = Documentation(csvfile) + documentation.req_writerow("".join(meta['prim']['txt']), + "".join(meta['primd']['txt']), + self.name) + documentation.hld_writerow("".join(meta['prim']['txt']), + "".join(meta['des']['txt']), + self.name) + documentation.tp_writerow("".join(meta['prim']['txt']), + "".join(meta['sec']['txt']), + "".join(meta['tc']['txt']), + self.name) + + def run(self, progress=-1): + global global_test_results + global preferences + + name = self.name.replace('test_', '') + + preferences = common.global_pref + + module = __import__(self.name) + # force a reload in case it has changed since it has + # been previously be imported + importlib.reload(module) + try: + module_run = getattr(module, 'run') + except Exception as e: + logging.critical(e) + return + # run the script + if hasattr(module_run, '__call__'): + skip_test = False + if type(self.__callbacks) is TestSuiteCallbacks and \ + 'clean' in self.__callbacks: + try: + logging.critical("CLEANING UP BEFORE -->" + self.name) + self.__callbacks['clean']() + except Exception as e: + logging.critical("EXCEPTION CLEANING BEFORE -->" + self.name) + if preferences['halt_on_exception']: + raise e + else: + # if the script went out of its way to say I want to halt all execution + # then honor that. + if type(e) == LUTFError and e.halt: + raise e + else: + rc = {'status': 'FAIL', 'error': str(e)} + skip_test = True + if skip_test: + rc['reason'] = 'Test setup cleanup failed' + rc['duration'] = 0 + rc['name'] = name + global_test_results["lutf-"+self.__parent_suite] = rc + return + try: + logging.critical("Started test script: %s" % str(self.name)) + start_time = datetime.datetime.now() + rc = module_run() + except Exception as e: + if preferences['halt_on_exception']: + raise e + else: + # if the script went out of its way to say I want to halt all execution + # then honor that. + if type(e) == LUTFError and e.halt: + raise e + else: + rc = {'status': 'FAIL', 'error': str(e)} + + logging.debug("Finished test script: %s" % str(self.name)) + duration = datetime.datetime.now() - start_time + rc['duration'] = int(round(duration.total_seconds())) + if rc['status'] == 'FAIL' and self.is_expected_failure(name): + rc['status'] = 'EXPECTED FAILURE' + rc['return_code'] = 0 + elif rc['status'] == 'FAIL': + rc['return_code'] = -22 + else: + rc['return_code'] = 0 + rc['name'] = name + global_test_results["lutf-"+self.__parent_suite] = rc + logging.debug("%s took %s to run" % (str(self.name), duration)) + + if progress != -1: + if clutf_global.get_lutf_mode() == clutf_global.EN_LUTF_RUN_INTERACTIVE: + print(name+"\t"+str(progress)+"%"+" "*30, end='\r') + if progress == 100: + print(name+"\t"+str(progress)+"%"+" "*30) + else: + with open(me.get_test_progress_path(), 'a+') as f: + out = '== lutf-' + self.__parent_suite + ' test ' + \ + name + ' ============ ' + str(progress) + "% complete\n" + f.write(out) + f.flush() + if type(self.__callbacks) is TestSuiteCallbacks and \ + 'clean' in self.__callbacks and progress == 100: + try: + self.__callbacks['clean']() + except: + logging.critical("Failed to clean at end of suite:" + self.__parent_suite) + pass + + def show(self): + with open(self.__abs_path, 'r') as f: + for line in f: + print(line.strip('\n')) + + def edit(self): + global preferences + preferences = common.global_pref + + try: + subprocess.call(preferences['editor']+" "+self.__abs_path, shell=True) + except: + logging.critical("No editor available") + print("No editor available") + +class TestCollection: + def __init__(self, base, name, callbacks, skip_list, expected_failures): + self.__suite_name = name + self.__test_db = {} + self.__max = 0 + self.__n = 0 + self.__abs_path = os.path.join(base, name) + self.__callbacks = callbacks + self.__skip_list = skip_list + self.__expected_failures = expected_failures + self.reload() + + def __getitem__(self, key): + try: + rc = self.__test_db[key] + except: + raise LUTFError('no entry for ' + str(key)) + return rc + + def __iter__(self): + self.__n = 0 + return self + + # needed for python 3.x + def __next__(self): + if self.__n < self.__max: + key = list(self.__test_db.keys())[self.__n] + suite = self.__test_db[key] + self.__n += 1 + return key, suite + else: + raise StopIteration + + def __generate_test_db(self, db): + # lutf/python/tests/suite_xxx has a list of tests + # make a dictionary of each of these. Each test script + # should start with "test_" + for subdir, dirs, files in os.walk(self.__abs_path): + added = False + for f in files: + if f.startswith('test_') and os.path.splitext(f)[1] == '.py': + # add any subidrectories to the sys path + if subdir != '.' and not added: + subdirectory = os.path.join(self.__abs_path, subdir) + if subdirectory not in sys.path: + sys.path.append(subdirectory) + added = True + name = os.path.splitext(f.replace('test_', ''))[0] + db[name] = Script(os.path.join(self.__abs_path, subdir, f), self.__callbacks, self.__suite_name, self) + + self.__max = len(self.__test_db) + + def in_expected_failures_list(self, name): + return name in self.__expected_failures + + def __in_skip_list(self, name): + return name in self.__skip_list + + def reload(self): + self.__test_db = {} + self.__generate_test_db(self.__test_db) + + def get_num_scripts(self, match='*'): + num_scripts = 0 + for key in sorted(self.__test_db.keys()): + if fnmatch.fnmatch(key, match) and not self.__in_skip_list(key): + num_scripts += 1 + return num_scripts + + # run all the scripts in this test suite + def run(self, match='*', num_scripts=0): + # get number of scripts + if not num_scripts: + num_scripts = self.get_num_scripts(match) + + executed = 0 + + with open(me.get_test_progress_path(), 'a+') as f: + out = '-----============= lutf-' + self.__suite_name.replace('suite_', '') + "\n" + f.write(out) + f.flush() + + for key in sorted(self.__test_db.keys()): + if fnmatch.fnmatch(key, match) and not self.__in_skip_list(key): + executed += 1 + progress = int((executed / num_scripts) * 100) + self.__test_db[key].run(progress) + + def create_docs(self, csvfile, match='*'): + for k, v in self.__test_db.items(): + if fnmatch.fnmatch(k, match): + v.create_docs(csvfile) + + def list(self): + return list(self.__test_db.keys()) + + def dump(self, match='*'): + scripts_dict = {'scripts': []} + for k, v in self.__test_db.items(): + if fnmatch.fnmatch(k, match): + if self.in_expected_failures_list(k): + scripts_dict['scripts'].append(k+' (expected failure)') + elif self.__in_skip_list(k): + scripts_dict['scripts'].append(k+' (skip)') + else: + scripts_dict['scripts'].append(k) + scripts_dict['scripts'].sort() + print(yaml.dump(scripts_dict, Dumper=LutfDumper, indent=2, sort_keys=True)) + + def get_suite_name(self): + return self.__suite_name + + def len(self): + return len(self.__test_db) + + def add(self, script): + default_script = os.path.join(clutf_global.get_lutf_path(), 'python', 'tests', 'sample.py') + if not os.path.isfile(default_script): + raise LUTFError("%s does not exist. Corrupted LUTF installation") + rc = shutil.copy(default_script, + os.path.join(self.__abs_path, script)) + +class TestSuiteCallbacks: + def __init__(self, **kwargs): + if type(kwargs) is not dict: + raise LUTFError("Must specify a dictionary") + self.__callbacks = kwargs + def __contains__(self, key): + return key in self.__callbacks + def __getitem__(self, key): + try: + rc = self.__callbacks[key] + except: + raise LUTFError('no entry for ' + str(key)) + return rc + def dump(self): + print(yaml.dump(self.__callbacks, Dumper=LutfDumper, indent=2, sort_keys=True)) + +class ATestSuite: + def __init__(self, base, name): + self.__base = base + self.__callback_reg = False + self.__callbacks = None + self.name = name + self.__abs_path = os.path.join(base, name) + self.scripts = None + self.__skip_list = [] + self.__expected_failures = [] + if self.__abs_path not in sys.path: + sys.path.append(self.__abs_path) + self.reload() + + def __register_callbacks(self): + if self.__callback_reg: + return + # find callbacks module in this suite and get the callbacks + for subdir, dirs, files in os.walk(self.__abs_path): + break + for f in files: + if f == 'callbacks.py' and not self.__callback_reg: + mod_name = self.name+'.'+'callbacks' + module = __import__(mod_name) + importlib.reload(module) + try: + ### TODO Add more test suite callbacks here + setup_clean_cb = getattr(module.callbacks, "lutf_clean_setup") + if hasattr(setup_clean_cb, '__call__'): + self.__callbacks = TestSuiteCallbacks(clean=setup_clean_cb) + except Exception as e: + logging.critical(str(e)) + self.callback_reg = True + del(module) + elif f == 'skip.py': + mod_name = self.name+'.'+'skip' + module = __import__(mod_name) + importlib.reload(module) + try: + if type(module.skip.skip_list) != list: + logging.critical('malformed skip list') + continue + try: + self.__skip_list = module.skip.skip_list + except: + pass + try: + self.__expected_failures = module.skip.expected_failures + except: + pass + except Exception as e: + logging.critical(str(e)) + pass + del(module) + + def reload(self): + self.__callback_reg = False + self.__register_callbacks() + self.scripts = TestCollection(self.__base, self.name, self.__callbacks, self.__skip_list, self.__expected_failures) + + def dump(self, match='*'): + self.scripts.dump(match) + + def list(self): + return self.scripts.list() + + def create_docs(self, csvfile, match='*'): + self.scripts.create_docs(csvfile, match) + + def get_num_scripts(self, match='*'): + return self.scripts.get_num_scripts(match) + + def run(self, match='*', num_scripts=0): + self.scripts.run(match=match, num_scripts=num_scripts) + + def get_abs_path(self): + return self.__abs_path + + def add(self, script): + new_name = 'test_'+os.path.splitext(script)[0]+'.py' + if os.path.isfile(new_name): + raise LUTFError("%s already exists" % (str(new_name))) + self.scripts.add(new_name) + self.reload() + +class TestSuites: + ''' + This class stores all the available test suites. + The following methods are available for the suites: + list() - list all the suites + run() - run all the suites + dump() - YAML output of the suites available + create_docs() - create document for all suites + A single suite can be accessed as follows: + suites['name of suite'] + A single suite provides the following methods: + list() - list all the scripts in the suite + run() - Run all the scripts in the suite + dump() - YAML output of the scripts available + create_docs() - create document for this suite + A single script can be accessed as follows: + suites['name of suite'].scripts['name of script'] + A single script provides the following methods: + edit() - edit the script + show() - show the script + run() - run the script + ''' + def __init__(self): + # iterate over the test scripts directory and generate + # An internal database + self.__test_db = {} + self.__max = 0 + self.__n = 0 + self.__lutf_path = clutf_global.get_lutf_path() + if len(self.__lutf_path) == 0: + raise LUTFError('No LUTF path provided') + self.__lutf_tests = self.__lutf_path + '/python/tests/' + if not os.path.isdir(self.__lutf_tests): + raise LUTFError('No tests suites director: ' + sef.lutf_tests) + self.__generate_test_db(self.__test_db) + + def __getitem__(self, key): + try: + rc = self.__test_db[key] + except: + raise LUTFError('no entry for ' + str(key)) + return rc + + def __iter__(self): + self.__n = 0 + return self + + # needed for python 3.x + def __next__(self): + if self.__n < self.__max: + key = list(self.__test_db.keys())[self.__n] + suite = self.__test_db[key] + self.__n += 1 + return key, suite + else: + raise StopIteration + + def __generate_test_db(self, db): + # lutf/python/tests has a directory for each test suite + # make a dictionary of each of these. The lutf/python/tests + # is one level hierarchy. Each directory suite should start + # with "suite' + for subdir, dirs, files in os.walk(self.__lutf_tests): + break + for d in dirs: + if d.startswith('suite_'): + name = d.replace('suite_', '') + db[name] = ATestSuite(self.__lutf_tests, d) + + self.__max = len(self.__test_db) + + def create_docs(self, csvfile, match='*'): + for k, v in self.__test_db.items(): + if fnmatch.fnmatch(k, match): + v.create_docs(csvfile) + + # run all the test suites + def run(self, suite_list='*', match='*'): + numscripts = {} + if suite_list == '*': + sl = list(self.__test_db.keys()) + else: + sl = [item for item in re.split(',| ', suite_list) if len(item.strip()) > 0] + num_scripts = 0 + for k, v in self.__test_db.items(): + if k in sl: + numscripts[k] = v.get_num_scripts('*') + + for k, v in self.__test_db.items(): + if k in sl: + v.run(num_scripts=numscripts[k]) + + def reload(self): + self.__test_db = {} + self.__generate_test_db(self.__test_db) + + def len(self): + return len(self.__test_db) + + def list(self): + return list(self.__test_db.keys()) + + def dump(self, match='*'): + suites_dict = {'suites': []} + for k, v in self.__test_db.items(): + if fnmatch.fnmatch(k, match): + suites_dict['suites'].append(k) + suites_dict['suites'].sort() + print(yaml.dump(suites_dict, Dumper=LutfDumper, indent=2, sort_keys=True)) + +class Myself: + ''' + Class which represents this LUTF instance. + It allows extraction of: + - interfaces available + - listen port + - telnet port + - name + - hostname + - LUTF type + It provides an exit method to exit the LUTF instance + ''' + def __init__(self, name, telnet_port): + global preferences + preferences = common.global_pref + self.name = name + self.__hostname = socket.gethostname() + self.__lutf_telnet_server = None + self.__lutf_telnet_port = telnet_port + self.__lutf_listen_port = clutf_global.get_master_port() + self.__lutf_type = clutf_global.get_lutf_type() + lscpu = lutf_exec_local_cmd('/usr/bin/lscpu') + self.__cpuinfo = yaml.load(lscpu[0].decode('utf-8'), Loader=yaml.FullLoader) + cfg_path = clutf_global.get_lutf_cfg_file_path() + if not cfg_path: + raise LUTFError("No LUTF config file provided") + with open(cfg_path, 'r') as f: + self.lutf_cfg = yaml.load(f, Loader=yaml.FullLoader) + config_ifs_num = MIN_IFS_NUM_DEFAULT + logging.critical('CONFIGURATION CONTENT--->' + str(self.lutf_cfg)) + if "num_intfs" in self.lutf_cfg['lutf']: + config_ifs_num = self.lutf_cfg['lutf']['num_intfs'] + if "lutf-env-vars" in self.lutf_cfg['lutf']: + self.import_env_vars(self.lutf_cfg['lutf']['lutf-env-vars']) + if "lustre-path" in self.lutf_cfg['lutf']: + self.__lustre_base_path = os.path.split(self.lutf_cfg['lutf']['lustre-path'])[0] + set_lustre_base_path(self.__lustre_base_path) + else: + self.__lustre_base_path = '' + self.alias_list = self.provision_intfs(config_ifs_num) + # delete any older test_progress files + if os.path.isfile(self.get_test_progress_path()): + os.remove(self.get_test_progress_path()) + + def import_env_vars(self, fpath): + with open(fpath, 'r') as f: + for line in f.readlines(): + if 'export ' in line: + s = line.replace('export ', '') + kv = s.split('=') + os.environ[kv[0].strip()] = kv[1].strip().strip('"') + + def get_lustre_base_path(self): + return self.__lustre_base_path + + def get_test_progress_path(self): + if 'test-progress' in self.lutf_cfg['lutf']: + path = self.lutf_cfg['lutf']['test-progress'] + else: + path = clutf_global.get_lutf_tmp_dir() + path = os.path.join(path, 'lutf_test_progress.out') + return path + + def get_local_interface_names(self): + return netifaces.interfaces() + + def get_local_interface_ip(self, name): + return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['addr'] + + def get_local_interface_nm(self, name): + return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['netmask'] + + def get_local_interface_bc(self, name): + return netifaces.ifaddresses(name)[netifaces.AF_INET][0]['broadcast'] + + def exit(self): + ''' + Shutdown the LUTF + ''' + if (len(self.alias_list) > 0): + for alias in self.alias_list: + del_ip_alias_cmd_str = "/usr/sbin/ip addr del " + alias + rc = lutf_exec_local_cmd(del_ip_alias_cmd_str) + ret_str = str(rc) + if "ERROR" in ret_str: + error = "Uexpected result when deleting an alias ip: %s\n" % (ret_str) + logging.debug(error) + print("Shutting down the LUTF") + exit() + + def get_cpuinfo(self): + return self.__cpuinfo + + def get_num_cpus(self): + return self.__cpuinfo['CPU(s)'] + + def get_num_numa_nodes(self): + return self.__cpuinfo['NUMA node(s)'] + + def list_intfs(self): + ''' + Return a list of all the interfaces available on this node + ''' + intfs = {'interfaces': {}} + for intf in self.get_local_interface_names(): + try: + intfs['interfaces'][intf] = {'ip': self.get_local_interface_ip(intf), + 'netmask': self.get_local_interface_nm(intf), + 'broadcast': self.get_local_interface_bc(intf)} + except: + pass + return intfs + + def dump_intfs(self): + ''' + Dump the interfaces in YAML format + ''' + print(yaml.dump(self.list_intfs(), sort_keys=False)) + + def my_name(self): + ''' + Return the symbolic name assigned to this LUTF instance + ''' + return self.name + + def my_hostname(self): + ''' + Return the hostname of this node + ''' + return self.__hostname + + def my_type(self): + ''' + Return the type of this LUTF instance + ''' + if self.__lutf_type == EN_LUTF_MASTER: + return 'MASTER' + elif self.__lutf_type == EN_LUTF_AGENT: + return 'AGENT' + raise LUTFError("Undefined LUTF role: %d" % (self.__lutf_type)) + + def my_telnetport(self): + ''' + Return the telnet port of this LUTF instance + ''' + return self.__lutf_telnet_port + + def my_listenport(self): + ''' + Return the listen port of this LUTF instance + ''' + return self.__lutf_listen_port + + def handle_rpc_req(self, rpc_yaml): + function_name = '' + class_name = '' + method_name = '' + rc = {} + + #rpc_str = rpc_yaml.decode('utf-8') + y = yaml.load(rpc_yaml, Loader=yaml.FullLoader) + # check to see if this is for me + target = y['rpc']['dst'] + if target != self.name: + logging.critical("RPC intended to %s but I am %s" % (target, self.name)) + return + source = y['rpc']['src'] + name = os.path.split(os.path.splitext(y['rpc']['script'])[0])[1] + path = os.path.split(os.path.splitext(y['rpc']['script'])[0])[0] + if path not in sys.path: + sys.path.append(path) + rpc_type = y['rpc']['type'] + if rpc_type == 'function_call': + function_name = y['rpc']['function'] + elif rpc_type == 'method_call': + class_name = y['rpc']['class'] + method_name = y['rpc']['method'] + class_id = y['rpc']['class_id'] + elif rpc_type == 'instantiate_class' or rpc_type == 'destroy_class': + class_name = y['rpc']['class'] + class_id = y['rpc']['class_id'] + else: + raise LUTFError('Unexpected rpc') + + module = __import__(name) + importlib.reload(module) + args = y['rpc']['parameters']['args'] + kwargs = y['rpc']['parameters']['kwargs'] + lutf_exception_string = None + try: + if rpc_type == 'function_call': + module_func = getattr(module, function_name) + if hasattr(module_func, '__call__'): + rc = module_func(*args, **kwargs) + elif rpc_type == 'instantiate_class': + my_class = getattr(module, class_name) + instance = my_class(*args, **kwargs) + common.add_to_class_db(instance, class_id) + elif rpc_type == 'destroy_class': + instance = common.get_class_from_db(class_id) + del(instance) + common.del_entry_from_class_db(class_id) + elif rpc_type == 'method_call': + instance = common.get_class_from_db(class_id) + if type(instance).__name__ != class_name: + raise LUTFError("requested class %s, but id refers to class %s" % (class_name, type(instance).__name__)) + rc = getattr(instance, method_name)(*args, **kwargs) + except Exception as e: + if type(e) == LUTFError: + lutf_exception_string = e + else: + exception_list = traceback.format_stack() + exception_list = exception_list[:-2] + exception_list.extend(traceback.format_tb(sys.exc_info()[2])) + exception_list.extend(traceback.format_exception_only(sys.exc_info()[0], sys.exc_info()[1])) + header = "Traceback (most recent call last):\n" + stacktrace = "".join(exception_list) + lutf_exception_string = header+stacktrace + if lutf_exception_string: + rc_yaml = populate_rpc_rsp(self.name, source, rc, lutf_exception_string) + else: + rc_yaml = populate_rpc_rsp(self.name, source, rc) + lutf_send_rpc_rsp(source, yaml.dump(rc_yaml)) + + def provision_intfs(self, num_intf_req): + # if there are enough interfaces, don't need to add aliases + intfs_dict = self.list_intfs() + intfs = list(intfs_dict['interfaces'].keys()) + num_available = len(intfs) + if num_available >= num_intf_req: + return [] + # add aliases for the last available interface + base_intf_name = intfs[num_available-1] + base_ip = netifaces.ifaddresses(base_intf_name)[netifaces.AF_INET][0]['addr'] + base_ip_netmask = netifaces.ifaddresses(base_intf_name)[netifaces.AF_INET][0]['netmask'] + base_ip_netmask_bits = sum(bin(int(x)).count('1') for x in base_ip_netmask.split('.')) + intf_ip_alias = base_ip + separator = '.' + intf_ip_alias_split = intf_ip_alias.split(separator) + ip_incr = 1 + alias_param_list = [] + for i in range(0, num_intf_req - num_available): + intf_name_alias = base_intf_name + ":" + str(i) + alias_confirmed = False + intf_ip_alias_candidate_split = intf_ip_alias_split[:] + + # try to find available ip address + while ip_incr < 254 and not alias_confirmed: + # increment ip addr candidate + intf_ip_alias_candidate_split[3] = str((int(intf_ip_alias_split[3])+ip_incr)%255) + intf_ip_alias = separator.join(intf_ip_alias_candidate_split) + ip_incr += 1 + try: + rc = lutf_exec_local_cmd("/usr/bin/ping -c 3 " + intf_ip_alias) + ret_str = str(rc) + except Exception as e: + if "Host Unreachable" in str(e): + alias_confirmed = True + break + if not alias_confirmed: + error = "Failed to allocate ip address for alias if %s\n" % (intf_name_alias) + logging.debug(error) + return alias_param_list + print("adding alias: ", intf_name_alias, " ip: ", intf_ip_alias) + # build the command string for adding the alias, back up for clean-up on exit + add_ip_alias_params = intf_ip_alias + "/" + str(base_ip_netmask_bits) + add_ip_alias_params += " brd + dev " + base_intf_name + " label " + intf_name_alias + add_ip_alias_cmd_str = "/usr/sbin/ip addr add " + add_ip_alias_params + rc = lutf_exec_local_cmd(add_ip_alias_cmd_str) + ret_str = str(rc) + if "Error" in ret_str: + error = "Uexpected result when creating an alias ip: %s\n" % (ret_str) + logging.debug(error) + return alias_param_list + alias_param_list.append(add_ip_alias_params) + + return alias_param_list + + +# Dump the global results to console or to file +def dumpGlobalTestResults(fname=None, status=None, desc=None): + ''' + Dump the YAML results for tests which ran so far + ''' + global global_test_results + + results = global_test_results.get() + + if fname: + fpath = fname + # if this is path then use it as is, otherwise put it in the tmp dir + if os.sep not in fname: + fpath = os.path.join(clutf_global.get_lutf_tmp_dir(), fname) + with open(fpath, 'w') as f: + f.write(yaml.dump(results, + Dumper=LutfDumper, indent=2, + sort_keys=False)) + else: + print(yaml.dump(results, Dumper=LutfDumper, indent=2, sort_keys=False)) + +def setup_paths(): + global lutf_tmp_dir + base_lustre = '' + + for p in LUTF_SCRIPT_PATHS: + path = os.path.join(clutf_global.get_lutf_path(),p) + if path not in sys.path: + sys.path.append(path) + lutf_tmp_dir = clutf_global.get_lutf_tmp_dir() + Path(lutf_tmp_dir).mkdir(parents=True, exist_ok=True) + +logging.basicConfig(filename=os.path.join(clutf_global.get_lutf_tmp_dir(), "lutf_py.log"), + filemode='w') +setup_paths() + +# All test results are stored in here +# Accessor functions can be used to dump it. +global_test_results = YamlGlobalTestResults() + +suites = TestSuites() + +agents = lutf_agent.LutfAgents() + +logging.critical("INSTANTIATING myself") +me = Myself(clutf_global.get_node_name(), + clutf_global.get_agent_telnet_port()) + +# Convenience Variables +R = dumpGlobalTestResults +A = agents.dump +I = me.dump_intfs +X = me.exit + +preferences = load_pref() +# set debug level +set_logging_level('debug') + diff --git a/lustre/tests/lutf/python/infra/lutf_agent.py b/lustre/tests/lutf/python/infra/lutf_agent.py new file mode 100644 index 0000000..8b7ba4f --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_agent.py @@ -0,0 +1,137 @@ +from clutf_agent import * +from lutf_common_def import * +from lutf_exception import LUTFError +import yaml, logging, sys, ctypes + +class Agent: + def __init__(self, name, hostname, id, ip, telnet_port, node_type): + self.name = name + pref = load_pref() + self.timeout = pref['RPC timeout'] + #logging.debug('RPC timeout set to: %d' % (self.timeout)) + if node_type == EN_LUTF_MASTER: + nt = 'MASTER' + elif node_type == EN_LUTF_AGENT: + nt = 'AGENT' + else: + raise LUTFError("Undefined node type %d for agent %s" % (node_type, name)) + self.info = {name : {'hostname': hostname, 'ip': ip, 'id': id, + 'telnet-port': telnet_port, + 'node-type': nt}} + + def get(self): + return self.info + + def get_ip(self): + return self.info[self.name]['ip'] + + def get_hostname(self): + return self.info[self.name]['hostname'] + + def get_telnet_port(self): + return self.info[self.name]['telnet-port'] + + def set_rpc_timeout(self, timeout): + self.timeout = timeout + + def send_rpc(self, rpc_type, src, script, cname, + mname, fname, *args, **kwargs): + + if not mname and not fname: + raise LUTFError("A method or a function name need to be specified") + + rpc = populate_rpc_req(src, self.name, rpc_type, script, cname, + mname, fname, *args, **kwargs) + y = yaml.dump(rpc) + #by = y.encode('utf-8') + rc, yaml_txt = lutf_send_rpc(self.name, y, self.timeout) + y = yaml.load(yaml_txt, Loader=yaml.FullLoader) + # sanity check + target = y['rpc']['dst'] + if target != src: + raise LUTFError("RPC intended to %s but I am %s" % (target, src)) + + source = y['rpc']['src'] + if source != self.name: + raise LUTFError("RPC originated from %s but expected from %s" % + (source, self.name)) + + if y['rpc']['type'] == 'failure': + raise LUTFError('RPC failure') + elif y['rpc']['type'] == 'exception': + if type(y['rpc']['exception']) == str: + raise LUTFError(nname=source, msg=y['rpc']['exception']) + else: + raise y['rpc']['exception'] + + return y['rpc']['rc'] + + def dump(self): + print(yaml.dump(self.info, sort_keys=False)) + +class LutfAgents: + """ + A class to access all agents. This is useful to get a view of all agents currently connected + """ + def __init__(self): + self.agent_dict = {} + self.max = 0 + self.n = 0 + self.reload() + + def __iter__(self): + self.n = 0 + return self + + # needed for python 3.x + def __next__(self): + if self.n < self.max: + key = list(self.agent_dict.keys())[self.n] + agent = self.agent_dict[key] + self.n += 1 + return key, agent + else: + raise StopIteration + + def __getitem__(self, key): + try: + rc = self.agent_dict[key] + except: + raise LUTFError('no entry for', key) + return rc + + def keys(self): + self.reload() + return list(self.agent_dict.keys()) + + def values(self): + self.reload() + return list(self.agent_dict.values()) + + def reload(self): + self.agent_dict = {} + self.max = 0 + for x in range(0, MAX_NUM_AGENTS): + agent = find_agent_blk_by_id(x) + if agent: + self.agent_dict[agent.name] = Agent(agent.name, + agent.hostname, x, + agent_ip2str(agent), + agent.telnet_port, + agent.node_type) + release_agent_blk(agent, False) + self.max += 1 + + # always update the dictionary for the following two operations + def dump(self): + self.reload() + for k, v in self.agent_dict.items(): + v.dump() + + def enable_hb_check(self): + agent_enable_hb() + + def disable_hb_check(self): + agent_disable_hb() + + diff --git a/lustre/tests/lutf/python/infra/lutf_basetest.py b/lustre/tests/lutf/python/infra/lutf_basetest.py new file mode 100644 index 0000000..50785f1 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_basetest.py @@ -0,0 +1,94 @@ +import lutf_agent +import clutf_global +import paramiko, logging, time +from lutf_exception import LUTFError +from lutf_paramiko import lutf_put_file +from lutf_common_def import load_pref + +LUTF_TEST_PASS = 0 +LUTF_TEST_FAIL = -1 +LUTF_TEST_SKIP = -2 + +class BaseTest(object): + # TODO the idea of the *args and **kwargs in the __init__ method is for subclasses + # to pass all their arguments to the super() class. Then the superclass can then pass + # that to the remote, so the remote class can be instantiated appropriately + def __init__(self, script, target=None, *args, **kwargs): + self.__remote = False + # if a target is specified other than me then we're going + # to execute on that target + if target and target != clutf_global.get_node_name(): + agents = lutf_agent.LutfAgents() + if target not in agents.keys(): + raise LUTFError("%s not a known agent" % target) + self.__script = script + self.__remote = True + self.__target = target + self.__class_id = time.time() + agent = agents[self.__target] + pref = load_pref() + if pref['remote copy']: + logging.debug("Putting script %s on %s:%s" % (self.__script, agent.get_hostname(), agent.get_ip())) + lutf_put_file(agent.get_ip(), self.__script, self.__script) + # tell the remote to instantiate the class and keep it around + agent = agents[self.__target] + agent.send_rpc('instantiate_class', clutf_global.get_node_name(), + self.__script, type(self).__name__, '__init__', '', + self.__class_id, *args, **kwargs) + + def __getattribute__(self, name): + attr = object.__getattribute__(self, name) + if hasattr(attr, '__call__'): + def newfunc(*args, **kwargs): + if self.__remote: + # execute on the remote defined by: + # self.target + # attr.__name__ = name of method + # type(self).__name__ = name of class + agents = lutf_agent.LutfAgents() + if self.__target not in agents.keys(): + raise LUTFError("%s not a known agent: %s" % (self.__target, str(agents.keys()))) + agent = agents[self.__target] + result = agent.send_rpc('method_call', + clutf_global.get_node_name(), + self.__script, + type(self).__name__, + attr.__name__, '', + self.__class_id, + *args, **kwargs) + else: + result = attr(*args, **kwargs) + return result + return newfunc + else: + return attr + + def __del__(self): + try: + # signal to the remote that the class is being destroyed + if self.__remote: + agents = lutf_agent.LutfAgents() + if self.__target not in agents.keys(): + raise LUTFError("%s not a known agent" % target) + agent = agents[self.__target] + agent.send_rpc('destroy_class', clutf_global.get_node_name(), + self.__script, type(self).__name__, '__del__', '', + self.__class_id) + except: + pass + +def lutfrc(error, *args, **kwargs): + rc = {} + if error == -1: + rc['status'] = 'FAIL' + elif error == -2: + rc['status'] = 'SKIP' + else: + rc['status'] = 'PASS' + if len(args): + rc['args'] = list(args) + if len(kwargs): + rc['kwargs'] = kwargs + return rc + + diff --git a/lustre/tests/lutf/python/infra/lutf_cmd.py b/lustre/tests/lutf/python/infra/lutf_cmd.py new file mode 100644 index 0000000..4d29a87 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_cmd.py @@ -0,0 +1,51 @@ +import os, shlex, subprocess, threading, ctypes, time, logging +from lutf_exception import LUTFError +import lutf_common_def + +def exec_cmd(cmd, exception=True): + if lutf_common_def.is_cmd_verbosity(): + logging.critical("executing -> " + cmd) + args = shlex.split(cmd) + try: + out = subprocess.Popen(args, stderr=subprocess.STDOUT, + stdout=subprocess.PIPE) + except Exception as e: + logging.critical("Failed to execute cmd: " + cmd) + logging.critical(e) + return [None, -1] + t = out.communicate()[0],out.returncode + if t[1] != 0 and exception: + raise LUTFError(cmd+"\n"+"rc = "+str(t[1])+"\n"+t[0].decode("utf-8")) + elif t[1] != 0: + logging.critical("Failed to execute cmd: " + cmd + " with rc = " + str(t[1])) + return t + +class LutfCmd(threading.Thread): + def __init__(self, name, cmd, exception=False): + threading.Thread.__init__(self) + self.name = name + self.cmd = cmd + self.thread_id = threading.get_ident() + self.rc = None + self.exception = exception + + def run(self): + self.rc = exec_cmd(self.cmd, exception=self.exception) + + def raise_exception(self): + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(self.thread_id, + ctypes.py_object(SystemExit)) + if res > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc(self.thread_id, 0) + +def lutf_exec_local_cmd(cmd, expire=0, exception=True): + if expire <= 0 or not type(expire) == int: + return exec_cmd(cmd, exception=exception) + cmd_thrd = LutfCmd('lutf_cmd', cmd, exception=exception) + cmd_thrd.start() + time.sleep(expire) + if cmd_thrd.isAlive(): + cmd_thrd.raise_exception() + if not cmd_thrd.rc: + raise StopIteration(cmd+"\nExpired") + return cmd_thrd.rc diff --git a/lustre/tests/lutf/python/infra/lutf_common_def.py b/lustre/tests/lutf/python/infra/lutf_common_def.py new file mode 100644 index 0000000..42cf857 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_common_def.py @@ -0,0 +1,226 @@ +import clutf_global +from lutf_exception import LUTFError, LutfDumper +import logging, os, yaml, shutil + +LUSTRE_BASE_PATH = '' +LUTF_STATUS_STRING = 'LUTF STATUS: ' +LUTF_STATUS_SUCCESS = 'Success' +LUTF_STATUS_FAILURE = 'Failure' +LUTF_STATUS_IGNORE = 'Ignore' +LUTF_CODE_STRING = 'LUTF CODE: ' +MASTER_PORT = 8494 +MASTER_DAEMON_PORT = 8495 +AGENT_DAEMON_PORT = 8094 +LUTF_SCRIPT_PATHS = ['src/', + 'python/', + 'python/tests-infra', + 'python/infra', + 'python/config', + 'python/tests/'] +MIN_IFS_NUM_DEFAULT = 3 + +def get_rpc_rsp_base(): + return {'rpc': {'dst': None, 'src': None, 'type': 'results', 'rc': None}} + +def get_rpc_req_base(): + return {'rpc': {'src': None, 'dst': None, 'type': None, 'script': None, + 'class': None, 'method': None, 'function': None, + 'parameters': {'args': None, 'kwargs': None}}} + +global_class_db = {} + +def add_to_class_db(instance, class_id): + if class_id in global_class_db: + raise LUTFError("Duplicate class_id. Contention in timing") + logging.debug("created instance for %s with id %f" % (type(instance).__name__, class_id)) + global_class_db[class_id] = instance + +def get_class_from_db(class_id): + if class_id in global_class_db: + return global_class_db[class_id] + logging.debug("Request for class not in the database %f" % class_id) + +def del_entry_from_class_db(class_id): + if class_id in global_class_db: + instance = global_class_db[class_id] + logging.debug("destroyed instance for %s with id %f" % (type(instance).__name__, class_id)) + del(global_class_db[class_id]) + +def dump_class_db(): + for k, v in global_class_db.items(): + logging.debug("id = %f, name = %s" % (k, type(v).__name__)) + +def populate_rpc_req(src, dst, req_type, script, cname, + mname, fname, class_id, *args, **kwargs): + rpc = get_rpc_req_base() + rpc['rpc']['src'] = src + rpc['rpc']['dst'] = dst + rpc['rpc']['type'] = req_type + rpc['rpc']['script'] = script + rpc['rpc']['class'] = cname + rpc['rpc']['method'] = mname + rpc['rpc']['function'] = fname + rpc['rpc']['class_id'] = class_id + rpc['rpc']['parameters']['args'] = args + rpc['rpc']['parameters']['kwargs'] = kwargs + return rpc + +def populate_rpc_rsp(src, dst, rc, exception=None): + rpc = get_rpc_rsp_base() + rpc['rpc']['src'] = src + rpc['rpc']['dst'] = dst + if exception: + rpc['rpc']['type'] = 'exception' + rpc['rpc']['exception'] = exception + else: + rpc['rpc']['type'] = 'response' + rpc['rpc']['rc'] = rc + return rpc + +GLOBAL_PREF_DEF = {'editor': shutil.which('vim'), 'loglevel': 'debug', + 'halt_on_exception': False, 'remote copy': False, + 'RPC timeout': 300, 'num_intfs': MIN_IFS_NUM_DEFAULT, + 'cmd verbosity': True} + +global_pref = GLOBAL_PREF_DEF + +global_pref_file = os.path.join(clutf_global.get_lutf_tmp_dir(), 'lutf_pref.yaml') + +def set_editor(editor): + ''' + Set the text base editor to use for editing scripts + ''' + global global_pref + if shutil.which(editor): + global_pref['editor'] = shutil.which(editor) + else: + logging.critical("%s is not found" % (str(editor))) + save_pref() + +def set_halt_on_exception(exc): + ''' + Set halt_on_exception. + True for raising exception and halting test progress + False for continuing test progress + ''' + global global_pref + + if type(exc) is not bool: + logging.critical("Must be True or False") + global_pref['halt_on_exception'] = False + return + global_pref['halt_on_exception'] = exc + save_pref() + +def set_rpc_timeout(timeout): + ''' + Set the RPC timeout in seconds. + That's the timeout to wait for the operation to complete on the remote end. + ''' + global global_pref + global_pref['RPC timeout'] = timeout + save_pref() + +def get_rpc_timeout(): + ''' + Get the RPC timeout in seconds. + That's the timeout to wait for the operation to complete on the remote end. + ''' + global global_pref + return global_pref['RPC timeout'] + +def set_lustre_base_path(path): + global LUSTRE_BASE_PATH + LUSTRE_BASE_PATH = path + +def get_lustre_base_path(): + global LUSTRE_BASE_PATH + return LUSTRE_BASE_PATH + +def set_script_remote_cp(enable): + ''' + set the remote copy feature + If True then scripts will be remote copied to the agent prior to execution + ''' + global global_pref + global_pref['remote copy'] = enable + save_pref() + +def set_logging_level(level): + ''' + Set Python log level. One of: critical, debug, error, fatal + ''' + global global_pref + + try: + log_level = getattr(logging, level.upper()) + logging.getLogger('').setLevel(log_level) + global_pref['loglevel'] = level + except: + logging.critical("Log level must be one of: critical, debug, error, fatal") + save_pref() + +def set_cmd_verbosity(value): + ''' + Set the shell command verbosity to either on or off. If on, then + all the shell commands will be written to the debug logging. + ''' + global global_pref + if value.upper() == 'ON': + global_pref['cmd verbosity'] = True + else: + global_pref['cmd verbosity'] = False + save_pref() + +def is_cmd_verbosity(): + ''' + True if command verbosity is set, False otherwise. + ''' + global global_pref + return global_pref['cmd verbosity'] + +def load_pref(): + ''' + Load the LUTF preferences. + editor - the editor of choice to use for editing scripts + halt_on_exception - True to throw an exception on first error + False to continue running scripts + log_level - Python log level. One of: critical, debug, error, fatal + ''' + global GLOBAL_PREF_DEF + global global_pref + global global_pref_file + + if os.path.isfile(global_pref_file): + with open(global_pref_file, 'r') as f: + global_pref = yaml.load(f, Loader=yaml.FullLoader) + if not global_pref: + global_pref = GLOBAL_PREF_DEF + else: + #compare with the default and fill in any entries + #which might not be there. + for k, v in GLOBAL_PREF_DEF.items(): + if not k in global_pref: + global_pref[k] = v + save_pref() + return global_pref + +def save_pref(): + ''' + Save the LUTF preferences. + editor - the editor of choice to use for editing scripts + halt_on_exception - True to throw an exception on first error + False to continue running scripts + log_level - Python log level. One of: critical, debug, error, fatal + ''' + global global_pref + global global_pref_file + + with open(global_pref_file, 'w') as f: + f.write(yaml.dump(global_pref, Dumper=LutfDumper, indent=2, sort_keys=False)) + +def dump_pref(): + global global_pref + print(yaml.dump(global_pref, Dumper=LutfDumper, indent=2, sort_keys=True)) + + diff --git a/lustre/tests/lutf/python/infra/lutf_exception.py b/lustre/tests/lutf/python/infra/lutf_exception.py new file mode 100644 index 0000000..0ce58d3 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_exception.py @@ -0,0 +1,71 @@ +from inspect import * +import traceback +import yaml +import clutf_global + +class LutfDumper(yaml.Dumper): + def increase_indent(self, flow=False, indentless=False): + return super(LutfDumper, self).increase_indent(flow, False) + +class LUTFError(Exception): + def __init__(self, msg='', arg=None, halt=False, nname=clutf_global.get_node_name()): + self.node_name = nname + self.msg = msg + self.arg = arg + self.halt = halt + self.filename, self.lineno, self.function, self.code_context, self.index = getframeinfo(currentframe().f_back) + exception_list = traceback.format_stack() + exception_list = exception_list[:-2] + exception_list.extend(traceback.format_tb(sys.exc_info()[2])) + exception_list.extend(traceback.format_exception_only(sys.exc_info()[0], sys.exc_info()[1])) + self.stacktrace = exception_str = "Traceback (most recent call last):\n" + self.stacktrace = "".join(exception_list) + + def __repr__(self): + return self.__str__() + + def __str__(self): + output = {'LUTFError': {'node-name': self.node_name, 'msg': self.msg, 'arg': self.arg, 'file name': self.filename, + 'line number': self.lineno, 'function': self.function}} + try: + y = yaml.dump(output, Dumper=LutfDumper, indent=2, sort_keys=False) + except Exception as e: + print(type(e), e) + return y + + def populate(self, node_name, msg, arg, halt, filename, lineno, function, code_context, index, stacktrace): + self.node_name = node_name + self.msg = msg + self.arg = arg + self.halt = halt + self.filename = filename + self.lineno = lineno + self.function = function + self.code_context = code_context + self.index = index + self.stacktrace = stacktrace + + def print_exception_info(self): + print("Exception at: ", self.filename,":", self.lineno, ":", self.function) + + def print_error_msg(self): + print(self.msg) + + def get_arg(self): + return self.arg + +def lutf_error_representer(dumper, data): + mapping = {'node-name': data.node_name, 'msg': data.msg, 'arg': data.arg, 'halt': data.halt, 'filename': data.filename, + 'lineno': data.lineno, 'function': data.function, 'code_context': data.code_context, + 'index': data.index, 'stacktrace': data.stacktrace} + return dumper.represent_mapping(u'!LUTFError', mapping) + +def lutf_error_constructor(loader, node): + value = loader.construct_mapping(node) + lutf_ex = LUTFError() + lutf_ex.populate(value['node-name'], value['msg'], value['arg'], value['halt'], value['filename'], value['lineno'], + value['function'], value['code_context'], value['index'], value['stacktrace']) + return lutf_ex + +yaml.add_representer(LUTFError, lutf_error_representer) +yaml.add_constructor(u'!LUTFError', lutf_error_constructor) diff --git a/lustre/tests/lutf/python/infra/lutf_file.py b/lustre/tests/lutf/python/infra/lutf_file.py new file mode 100644 index 0000000..5658f37 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_file.py @@ -0,0 +1,146 @@ +import lutf_agent +from lutf_basetest import BaseTest +from lutf_paramiko import * +from clutf_global import * +from clutf_agent import * +from lutf import me +from lutf_exception import LUTFError +import os, random, tempfile, shutil, pathlib + +class LutfDir(BaseTest): + def __init__(self, dname, script=os.path.abspath(__file__), + target=None): + self.dname = dname + super().__init__(script, target, self.dname) + + def listdir(self): + return os.listdir(self.dname) + +class LutfFile(BaseTest): + def __init__(self, fname, script=os.path.abspath(__file__), + full_path=False, target=None): + if not full_path: + self.fname = os.path.join(os.getcwd(), fname) + else: + self.fname = fname + super().__init__(script, target, self.fname, full_path=full_path) + self.file_handle = None + + def open(self, mode): + self.file_handle = open(self.fname, mode) + + def write(self, data): + if not self.file_handle or self.file_handle.closed: + raise LUTFError("%s not opened" % self.fname) + self.file_handle.write(data) + + def get_full_path(self): + return self.fname + + def readlines(self): + lines = self.file_handle.readlines() + return lines + + def read(self): + data = self.file_handle.read() + return data + + def isclosed(self): + if self.file_handle: + return self.file_handle.closed + return True + + def close(self): + if self.file_handle and not self.file_handle.closed: + self.file_handle.close() + + def remove(self): + if self.file_handle and not self.file_handle.closed: + self.file_handle.close() + os.remove(self.fname) + else: + os.remove(self.fname) + self.fname = '' + self.file_handle = None + + def find_replace_file(self, fname, search, replace, getline=False): + count = 0 + found_line = None + if not self.isclosed(): + raise LUTFError("Can not perform operation on file. Close it first") + + # if no replace is provided then just count the instances of the + # search string + if not replace: + with open(fname) as old_file: + for line in old_file: + if search in line: + if not found_line: + found_line = line + count += 1 + if getline: + return count, found_line + return count + + fh, abs_path = tempfile.mkstemp() + with os.fdopen(fh,'w') as new_file: + with open(fname) as old_file: + for line in old_file: + if search in line: + if not found_line: + found_line = line + new_file.write(replace) + count += 1 + else: + new_file.write(line) + #Copy the file permissions from the old file to the new file + shutil.copymode(fname, abs_path) + #Remove original file + os.remove(fname) + #Move new file + shutil.move(abs_path, fname) + + if getline: + return count, found_line + return count + + def find_replace_global(self, directory, search, replace, getline=False): + count = 0 + lines = [] + for filename in os.listdir(directory): + fpath = os.path.join(directory, filename) + if getline: + c, line = self.find_replace_file(fpath, search, replace, getline) + if c >= 1: + lines.append(line) + count += c + else: + count += self.find_replace_file(fpath, search, replace, getline) + if getline: + return count, lines + return count + + def find_replace(self, search, replace): + count = 0 + if os.path.isdir(self.fname): + count = self.find_replace_global(self.fname, search, replace) + elif os.path.isfile(self.fname): + count = self.find_replace_file(self.fname, search, replace) + return count + + def find(self, search): + count = 0 + if os.path.isdir(self.fname): + count = self.find_replace_global(self.fname, search, None) + elif os.path.isfile(self.fname): + count = self.find_replace_file(self.fname, search, None) + return count + + def get(self, search): + count = 0 + if os.path.isdir(self.fname): + count, line = self.find_replace_global(self.fname, search, None, getline=True) + elif os.path.isfile(self.fname): + count, line = self.find_replace_file(self.fname, search, None, getline=True) + return count, line + diff --git a/lustre/tests/lutf/python/infra/lutf_get_num_agents.py b/lustre/tests/lutf/python/infra/lutf_get_num_agents.py new file mode 100644 index 0000000..0162bcd --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_get_num_agents.py @@ -0,0 +1,10 @@ +from clutf_agent import * + +def get_num_agents(): + i = 0 + for x in range(0, MAX_NUM_AGENTS): + agent = find_agent_blk_by_id(x) + if agent: + i = i + 1 + release_agent_blk(agent, False) + return i diff --git a/lustre/tests/lutf/python/infra/lutf_paramiko.py b/lustre/tests/lutf/python/infra/lutf_paramiko.py new file mode 100644 index 0000000..5f59646 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_paramiko.py @@ -0,0 +1,47 @@ +import paramiko, logging + +def lutf_get_file(target, rfile, sfile): + ssh = paramiko.SSHClient() + ssh.load_system_host_keys() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(hostname=target, timeout=3, banner_timeout=3, auth_timeout=3, username='root') + sftp = ssh.open_sftp() + + logging.debug("Commencing get %s -> %s" % (rfile, sfile)) + sftp.get(rfile, sfile) + + sftp.close() + ssh.close() + +def lutf_put_file(target, sfile, rfile): + ssh = paramiko.SSHClient() + ssh.load_system_host_keys() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(hostname=target, timeout=3, banner_timeout=3, auth_timeout=3, username='root') + sftp = ssh.open_sftp() + + logging.debug("Commencing put %s -> %s" % (sfile, rfile)) + sftp.put(sfile, rfile) + + sftp.close() + ssh.close() + +def lutf_exec_remote_cmd(cmd, host, ignore_err=False): + ssh = paramiko.SSHClient() + ssh.load_system_host_keys() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(hostname=host, timeout=3, banner_timeout=3, auth_timeout=3, username='root') + stdin, stdout, stderr = ssh.exec_command(cmd) + + error = False + rc = '' + for line in stderr.read().splitlines(): + if error: + rc += '\n' + error = True + rc += host+': '+line.decode("utf-8") + if error and not ignore_err: + raise ValueError("(%s) Failed to execute: %s: %s" % (host, cmd, rc)) + ssh.close() + return rc + diff --git a/lustre/tests/lutf/python/infra/lutf_telnet_cl.py b/lustre/tests/lutf/python/infra/lutf_telnet_cl.py new file mode 100644 index 0000000..4ac3395 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_telnet_cl.py @@ -0,0 +1,169 @@ +import telnetlib +import socket +import select +import sys + +# TODO: need to fix the echo issue + +class Interactive_remote (telnetlib.Telnet): + """Creates an interactive remote session. + + Takes the following parameters: + host: string of hostname or IP address, ex: "localhost" or "127.0.0.1" + port: port to use to connect to the hose + timeout (optional): timeout before failure. + + """ + def __init__(self, host, port, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): + telnetlib.Telnet.__init__(self, None, 0, timeout) + try: + self.open(host, port, timeout) + except IOError as e: + print('*** Unable to open Connection: ', e.errno, ':', e.strerror, ' Please try later ***') + return + self.write(telnetlib.IAC + telnetlib.DONT + telnetlib.ECHO) + self.interact() + + def echo_off(self): + """Turn off echo on the server end. This tells the server to not echo + what the client sends to it. + + """ + self.write(telnetlib.IAC + telnetlib.DONT + telnetlib.ECHO) + + def setup_conn(self): + # setup echo + self.write_raw(telnetlib.IAC + telnetlib.DO + telnetlib.ECHO) + # suppress go ahead + self.write_raw(telnetlib.IAC + telnetlib.DO + telnetlib.SGA) + self.write_raw(telnetlib.IAC + telnetlib.WILL + telnetlib.SGA) + # setup a new environment + self.write_raw(telnetlib.IAC + telnetlib.WILL + telnetlib.NEW_ENVIRON) + # setup terminal type to xterm + self.write_raw(telnetlib.IAC + telnetlib.WILL + telnetlib.TTYPE) + self.write_raw(telnetlib.IAC + telnetlib.SB + telnetlib.TTYPE + chr(0) + "XTERM" + telnetlib.IAC + telnetlib.SE) + + def write_raw(self, buffer): + self.msg("send %r", buffer) + self.sock.sendall(buffer) + + def write(self, buffer): + """Write a string to the socket, doubling any IAC characters. + + Can block if the connection is blocked. May raise + socket.error if the connection is closed. + + """ + if telnetlib.IAC in buffer: + buffer = buffer.replace(telnetlib.IAC, telnetlib.IAC+telnetlib.IAC) + else: + self.send_buf = buffer.replace('\n', '\r\n') + self.msg("send %r", buffer) + self.sock.sendall(buffer) + + def read(self): + #TODO: need to see how to get the timeouts workign properly + rfd, wfd, xfd = select.select([self], [], [], 0) + if self in rfd: + try: + text = self.read_eager() + except EOFError: + print('*** Connection closed by remote host ***') + except IOError as e: + print('*** Connection closed: ', e.errno, ':', e.strerror, ' ***') + return text + + def wait_for(self, wait_text): + rc_text = '' + while 1: + rfd, wfd, xfd = select.select([self, sys.stdin], [], []) + if self in rfd: + try: + text = self.read_eager() + except EOFError: + print('*** Connection closed by remote host ***') + break + except IOError as e: + print('*** Connection closed: ', e.errno, ':', e.strerror, ' ***') + break + if text: + rc_text += text + if (len(wait_text) > 0) and (wait_text.find(text) == 0): + wait_text = wait_text.replace(text, '') + if (len(wait_text) == 0): + break + elif (len(self.send_buf) > 0) and (wait_text in text): + break + return rc_text + + + def interact(self): + """Enter interactive mode with the remote end + + """ + self.shutdown = False + while (not self.shutdown): + rfd, wfd, xfd = select.select([self, sys.stdin], [], []) + if self in rfd: + try: + text = self.read_eager() + except EOFError: + print('*** Connection closed by remote host ***') + break + except IOError as e: + print('*** Connection closed: ', e.errno, ':', e.strerror, ' ***') + break + if text: + if (len(self.send_buf) > 0) and (self.send_buf.find(text) == 0): + self.send_buf = self.send_buf.replace(text, '') + continue + elif (len(self.send_buf) > 0) and (self.send_buf in text): + text = text.replace(self.send_buf, '', 1) + self.send_buf = '' + sys.stdout.write(text) + sys.stdout.flush() + if sys.stdin in rfd: + line = sys.stdin.readline() + if ((line.find("exit") > -1) or (line.find("quit") > -1)) and (line.__len__() == 5): + self.shutdown = True + if not line: + break + self.write(line) + self.close() + +class telnet_cl(Interactive_remote): + """Creates a none interactive remote session. It can be turned into an + interactive session by invoking the interact() method. + + Takes the following parameters: + host: string of hostname or IP address, ex: "localhost" or "127.0.0.1" + port: port to use to connect to the hose + timeout (optional): timeout before failure. + + """ + def __init__(self, host, port, + timeout=socket._GLOBAL_DEFAULT_TIMEOUT): + telnetlib.Telnet.__init__(self, None, 0, timeout) + try: + self.open(host, port, timeout) + except IOError as e: + print('*** Unable to open Connection: ', e.errno, ':', e.strerror, ' Please try later ***') + return + self.set_option_negotiation_callback(self.handle_option) + self.send_buf = '' + + def read_all(self): + text = "" + t = self.read() + while t != None: + text += t + t = self.read() + return text + + def handle_option(self, sock, cmd, opt): + # ignore any options + self.sock = sock + self.cmd = cmd + self.opt = opt + #if ((cmd == telnetlib.WILL) and (opt == telnetlib.ECHO)): + # self.write_raw(telnetlib.IAC + telnetlib.DONT + telnetlib.ECHO) diff --git a/lustre/tests/lutf/python/infra/lutf_telnet_sr.py b/lustre/tests/lutf/python/infra/lutf_telnet_sr.py new file mode 100644 index 0000000..4a6c2c0 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_telnet_sr.py @@ -0,0 +1,164 @@ +import code +import telnetsrvlib +import socketserver +import logging +import sys +import threading +from lutf import me, suites, agents, global_test_results + +g_tns = None + +class TNS(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + +class TNH(telnetsrvlib.TelnetHandler): + def __init__(self, request, client_address, server): + self.DOECHO = False + self.interact = False + self.console = None + self.old_stdout = None + self.more = False + self.PROMPT = "lutf### " + telnetsrvlib.TelnetHandler.__init__(self, request, client_address, server) + + # Overide the handler to work with the interactive console that + # we will create + def handle(self): + "The actual service to which the user has connected." + username = None + password = None + if self.authCallback: + if self.authNeedUser: + if self.DOECHO: + self.write("Username: ") + username = self.readline() + if self.authNeedPass: + if self.DOECHO: + self.write("Password: ") + password = self.readline(echo=False) + if self.DOECHO: + self.write("\n") + try: + self.authCallback(username, password) + except: + return + while self.RUNSHELL: + if self.DOECHO: + self.write(self.PROMPT) + if self.interact != True: + cmdlist = [item.strip() for item in self.readline().split()] + idx = 0 + while idx < (len(cmdlist) - 1): + if cmdlist[idx][0] in ["'", '"']: + cmdlist[idx] = cmdlist[idx] + " " + cmdlist.pop(idx+1) + if cmdlist[idx][0] != cmdlist[idx][-1]: + continue + cmdlist[idx] = cmdlist[idx][1:-1] + idx = idx + 1 + if cmdlist: + cmd = cmdlist[0].upper() + params = cmdlist[1:] + if cmd in self.COMMANDS: + try: + self.COMMANDS[cmd](params) + except: + (t, p, tb) = sys.exc_info() + if self.handleException(t, p, tb): + break + else: + self.write("Unknown command '%s'\n" % cmd) + else: + try: + try: + line = self.raw_input(self.PROMPT) + logging.debug(line) + #self.write(line) + encoding = getattr(sys.stdin, "encoding", None) + #logging.debug(encoding) + if encoding and not isinstance(line, str): + line = line.decode(encoding) + except EOFError: + self.write("\n") + break + else: + if (not self.more and (line == "logout" or line == "quit" or line == "exit")): + self.interact = False + self.PROMPT = "lutf### " + sys.stdout = self.old_stdout + else: + self.more = self.console.push(line) + if self.more: + self.PROMPT = "lutf... " + else: + self.PROMPT = "lutf>>> " + except KeyboardInterrupt: + self.write("\nKeyboardInterrupt\n") + self.console.resetbuffer() + self.more = 0 + logging.debug("Exiting handler") + def raw_input(self, prompt=""): + #self.write(prompt) + return self.readline() + def cmdECHO(self, params): + """ [ ...] + Echo parameters + Echo command line parameters back to user, one per line. + """ + self.writeline("Parameters:") + for item in params: + self.writeline("\t%s" % item) + def cmdINTERACT(self, params): + """ + Enter python interactive mode + The main LUTF program should've set the globals properly + """ + self.interact = True + vars = globals().copy() + vars.update(locals()) + self.console = code.InteractiveConsole(vars) + self.console.raw_input = self.raw_input + self.console.write = self.write + self.PROMPT = "lutf>>> " + self.old_stdout = sys.stdout + sys.stdout = self + #logging.debug("cmdINTERACT complete") + + def cmdSHUTDOWN(self, params): + """ + Shutdown the LUTF Daemon + """ + global g_tns + g_tns.stop() + self.RUNSHELL = False + self.writeline("Goodbye") + +#class TNS1(socketserver.TCPServer): +# allow_reuse_address = True + +#class TNH1(telnetsrvlib.TelnetHandler): +# def __init__(self, request, client_address, server): +# print "calling TNH1.constructore" +# telnetsrvlib.TelnetHandler.__init__(self, request, client_address, server) +# def cmdECHO(self, params): +# """ [ ...] +# Echo parameters +# Echo command line parameters back to user, one per line. +# """ +# self.writeline("Parameters:") +# for item in params: +# self.writeline("\t%s" % item) + +class LutfTelnetServer: + def __init__(self, telnet_port): + self.__telnet_port = telnet_port + self.__tns = None + + def run(self): + global g_tns + logging.getLogger('').setLevel(logging.CRITICAL) + self.__tns = TNS(("0.0.0.0", self.__telnet_port), TNH) + g_tns = self + self.__tns.serve_forever() + + def stop(self): + self.__tns.shutdown() diff --git a/lustre/tests/lutf/python/infra/lutf_utils.py b/lustre/tests/lutf/python/infra/lutf_utils.py new file mode 100644 index 0000000..2418f23 --- /dev/null +++ b/lustre/tests/lutf/python/infra/lutf_utils.py @@ -0,0 +1,27 @@ +import random, os, threading + +class LutfThread(threading.Thread): + def __init__(self, name, function, exception=False, *args, **kwargs): + threading.Thread.__init__(self) + self.name = name + self.thread_id = threading.get_ident() + self.rc = None + self.exception = exception + self.args = args + self.kwargs = kwargs + self.function = function + + def run(self): + self.rc = self.function(*self.args, **self.kwargs) + + def raise_exception(self): + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(self.thread_id, + ctypes.py_object(SystemExit)) + if res > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc(self.thread_id, 0) + +def generate_random_int_array(size, minimum=1, maximum=3000): + return random.sample(range(minimum, maximum), size) + +def generate_random_bytes(size): + return os.urandom(size) diff --git a/lustre/tests/lutf/python/infra/telnetsrvlib.py b/lustre/tests/lutf/python/infra/telnetsrvlib.py new file mode 100644 index 0000000..67037c2 --- /dev/null +++ b/lustre/tests/lutf/python/infra/telnetsrvlib.py @@ -0,0 +1,779 @@ +"""TELNET server class + +Based on the telnet client in telnetlib.py + +Presents a command line interface to the telnet client. +Various settings can affect the operation of the server: + + authCallback = Reference to authentication function. If + there is none, no un/pw is requested. Should + raise an exception if authentication fails + Default: None + authNeedUser = Should a username be requested? + Default: False + authNeedPass = Should a password be requested? + Default: False + COMMANDS = Dictionary of supported commands + Key = command (Must be upper case) + Value = List of (function, help text) + Function.__doc__ should be long help + Function.aliases may be a list of alternative spellings +""" + +#from telnetlib import IAC, WILL, WONT, DO, DONT, ECHO, SGA, Telnet +import threading +import socketserver +import socket +import time +import sys +import traceback +import curses.ascii +import curses.has_key +import curses +import logging +import re +if not hasattr(socket, 'SHUT_RDWR'): + socket.SHUT_RDWR = 2 + +__all__ = ["TelnetHandler", "TelnetCLIHandler"] + +IAC = chr(255) # "Interpret As Command" +DONT = chr(254) +DO = chr(253) +WONT = chr(252) +WILL = chr(251) +theNULL = chr(0) + +SE = chr(240) # Subnegotiation End +NOP = chr(241) # No Operation +DM = chr(242) # Data Mark +BRK = chr(243) # Break +IP = chr(244) # Interrupt process +AO = chr(245) # Abort output +AYT = chr(246) # Are You There +EC = chr(247) # Erase Character +EL = chr(248) # Erase Line +GA = chr(249) # Go Ahead +SB = chr(250) # Subnegotiation Begin + +BINARY = chr(0) # 8-bit data path +ECHO = chr(1) # echo +RCP = chr(2) # prepare to reconnect +SGA = chr(3) # suppress go ahead +NAMS = chr(4) # approximate message size +STATUS = chr(5) # give status +TM = chr(6) # timing mark +RCTE = chr(7) # remote controlled transmission and echo +NAOL = chr(8) # negotiate about output line width +NAOP = chr(9) # negotiate about output page size +NAOCRD = chr(10) # negotiate about CR disposition +NAOHTS = chr(11) # negotiate about horizontal tabstops +NAOHTD = chr(12) # negotiate about horizontal tab disposition +NAOFFD = chr(13) # negotiate about formfeed disposition +NAOVTS = chr(14) # negotiate about vertical tab stops +NAOVTD = chr(15) # negotiate about vertical tab disposition +NAOLFD = chr(16) # negotiate about output LF disposition +XASCII = chr(17) # extended ascii character set +LOGOUT = chr(18) # force logout +BM = chr(19) # byte macro +DET = chr(20) # data entry terminal +SUPDUP = chr(21) # supdup protocol +SUPDUPOUTPUT = chr(22) # supdup output +SNDLOC = chr(23) # send location +TTYPE = chr(24) # terminal type +EOR = chr(25) # end or record +TUID = chr(26) # TACACS user identification +OUTMRK = chr(27) # output marking +TTYLOC = chr(28) # terminal location number +VT3270REGIME = chr(29) # 3270 regime +X3PAD = chr(30) # X.3 PAD +NAWS = chr(31) # window size +TSPEED = chr(32) # terminal speed +LFLOW = chr(33) # remote flow control +LINEMODE = chr(34) # Linemode option +XDISPLOC = chr(35) # X Display Location +OLD_ENVIRON = chr(36) # Old - Environment variables +AUTHENTICATION = chr(37) # Authenticate +ENCRYPT = chr(38) # Encryption option +NEW_ENVIRON = chr(39) # New - Environment variables +# the following ones come from +# http://www.iana.org/assignments/telnet-options +# Unfortunately, that document does not assign identifiers +# to all of them, so we are making them up +TN3270E = chr(40) # TN3270E +XAUTH = chr(41) # XAUTH +CHARSET = chr(42) # CHARSET +RSP = chr(43) # Telnet Remote Serial Port +COM_PORT_OPTION = chr(44) # Com Port Control Option +SUPPRESS_LOCAL_ECHO = chr(45) # Telnet Suppress Local Echo +TLS = chr(46) # Telnet Start TLS +KERMIT = chr(47) # KERMIT +SEND_URL = chr(48) # SEND-URL +FORWARD_X = chr(49) # FORWARD_X +PRAGMA_LOGON = chr(138) # TELOPT PRAGMA LOGON +SSPI_LOGON = chr(139) # TELOPT SSPI LOGON +PRAGMA_HEARTBEAT = chr(140) # TELOPT PRAGMA HEARTBEAT +EXOPL = chr(255) # Extended-Options-List +NOOPT = chr(0) + +#Codes used in SB SE data stream for terminal type negotiation +IS = chr(0) +SEND = chr(1) + +CMDS = { + WILL: 'WILL', + WONT: 'WONT', + DO: 'DO', + DONT: 'DONT', + SE: 'Subnegotiation End', + NOP: 'No Operation', + DM: 'Data Mark', + BRK: 'Break', + IP: 'Interrupt process', + AO: 'Abort output', + AYT: 'Are You There', + EC: 'Erase Character', + EL: 'Erase Line', + GA: 'Go Ahead', + SB: 'Subnegotiation Begin', + BINARY: 'Binary', + ECHO: 'Echo', + RCP: 'Prepare to reconnect', + SGA: 'Suppress Go-Ahead', + NAMS: 'Approximate message size', + STATUS: 'Give status', + TM: 'Timing mark', + RCTE: 'Remote controlled transmission and echo', + NAOL: 'Negotiate about output line width', + NAOP: 'Negotiate about output page size', + NAOCRD: 'Negotiate about CR disposition', + NAOHTS: 'Negotiate about horizontal tabstops', + NAOHTD: 'Negotiate about horizontal tab disposition', + NAOFFD: 'Negotiate about formfeed disposition', + NAOVTS: 'Negotiate about vertical tab stops', + NAOVTD: 'Negotiate about vertical tab disposition', + NAOLFD: 'Negotiate about output LF disposition', + XASCII: 'Extended ascii character set', + LOGOUT: 'Force logout', + BM: 'Byte macro', + DET: 'Data entry terminal', + SUPDUP: 'Supdup protocol', + SUPDUPOUTPUT: 'Supdup output', + SNDLOC: 'Send location', + TTYPE: 'Terminal type', + EOR: 'End or record', + TUID: 'TACACS user identification', + OUTMRK: 'Output marking', + TTYLOC: 'Terminal location number', + VT3270REGIME: '3270 regime', + X3PAD: 'X.3 PAD', + NAWS: 'Window size', + TSPEED: 'Terminal speed', + LFLOW: 'Remote flow control', + LINEMODE: 'Linemode option', + XDISPLOC: 'X Display Location', + OLD_ENVIRON: 'Old - Environment variables', + AUTHENTICATION: 'Authenticate', + ENCRYPT: 'Encryption option', + NEW_ENVIRON: 'New - Environment variables', +} + +class TelnetHandler(socketserver.BaseRequestHandler): + "A telnet server based on the client in telnetlib" + + # What I am prepared to do? + DOACK = { + ECHO: WILL, + SGA: WILL, + NEW_ENVIRON: WONT, + } + # What do I want the client to do? + WILLACK = { + ECHO: DONT, + SGA: DO, + NAWS: DONT, + TTYPE: DO, + LINEMODE: DONT, + NEW_ENVIRON: DO, + } + # Default terminal type - used if client doesn't tell us its termtype + TERM = "ansi" + # Keycode to name mapping - used to decide which keys to query + KEYS = { # Key escape sequences + curses.KEY_UP: 'Up', # Cursor up + curses.KEY_DOWN: 'Down', # Cursor down + curses.KEY_LEFT: 'Left', # Cursor left + curses.KEY_RIGHT: 'Right', # Cursor right + curses.KEY_DC: 'Delete', # Delete right + curses.KEY_BACKSPACE: 'Backspace', # Delete left + } + # Reverse mapping of KEYS - used for cooking key codes + ESCSEQ = { + } + # Terminal output escape sequences + CODES = { + 'DEOL': '', # Delete to end of line + 'DEL': '', # Delete and close up + 'INS': '', # Insert space + 'CSRLEFT': '', # Move cursor left 1 space + 'CSRRIGHT': '', # Move cursor right 1 space + } + # What prompt to display + PROMPT = "Telnet Server> " + # The function to call to verify authentication data + authCallback = None + # Does authCallback want a username? + authNeedUser = False + # Does authCallback want a password? + authNeedPass = False + +# --------------------------- Environment Setup ---------------------------- + + def __init__(self, request, client_address, server): + """Constructor. + + When called without arguments, create an unconnected instance. + With a hostname argument, it connects the instance; a port + number is optional. + """ + # Am I doing the echoing? + self.DOECHO = True + # What opts have I sent DO/DONT for and what did I send? + self.DOOPTS = {} + # What opts have I sent WILL/WONT for and what did I send? + self.WILLOPTS = {} + # What commands does this CLI support + self.COMMANDS = {} + self.sock = None # TCP socket + self.rawq = '' # Raw input string + self.cookedq = [] # This is the cooked input stream (list of charcodes) + self.sbdataq = '' # Sub-Neg string + self.eof = 0 # Has EOF been reached? + self.iacseq = '' # Buffer for IAC sequence. + self.sb = 0 # Flag for SB and SE sequence. + self.history = [] # Command history + self.IQUEUELOCK = threading.Lock() + self.OQUEUELOCK = threading.Lock() + self.RUNSHELL = True + # A little magic - Everything called cmdXXX is a command + for k in dir(self): + if k[:3] == 'cmd': + name = k[3:] + method = getattr(self, k) + self.COMMANDS[name] = method + for alias in getattr(method, "aliases", []): + self.COMMANDS[alias] = self.COMMANDS[name] + socketserver.BaseRequestHandler.__init__(self, request, client_address, server) + + def setterm(self, term): + "Set the curses structures for this terminal" + logging.debug("Setting termtype to %s" % (term, )) + curses.setupterm(term) # This will raise if the termtype is not supported + self.TERM = term + self.ESCSEQ = {} + for k in self.KEYS.keys(): + str = curses.tigetstr(curses.has_key._capability_names[k]) + if str: + self.ESCSEQ[str] = k + self.CODES['DEOL'] = curses.tigetstr('el') + self.CODES['DEL'] = curses.tigetstr('dch1') + self.CODES['INS'] = curses.tigetstr('ich1') + self.CODES['CSRLEFT'] = curses.tigetstr('cub1') + self.CODES['CSRRIGHT'] = curses.tigetstr('cuf1') + + def setup(self): + "Connect incoming connection to a telnet session" + logging.debug("calling setterm from setup") + self.setterm(self.TERM) + self.sock = self.request + for k in self.DOACK.keys(): + self.sendcommand(self.DOACK[k], k) + for k in self.WILLACK.keys(): + self.sendcommand(self.WILLACK[k], k) + self.thread_ic = threading.Thread(target=self.inputcooker) + self.thread_ic.setDaemon(True) + self.thread_ic.start() + # Sleep for 0.5 second to allow options negotiation + time.sleep(0.5) + + def finish(self): + "End this session" + self.sock.shutdown(socket.SHUT_RDWR) + +# ------------------------- Telnet Options Engine -------------------------- + + def options_handler(self, sock, cmd, opt): + "Negotiate options" +# if CMDS.has_key(cmd): +# cmdtxt = CMDS[cmd] +# else: +# cmdtxt = "cmd:%d" % ord(cmd) +# if cmd in [WILL, WONT, DO, DONT]: +# if CMDS.has_key(opt): +# opttxt = CMDS[opt] +# else: +# opttxt = "opt:%d" % ord(opt) +# else: +# opttxt = "" +# logging.debug("OPTION: %s %s" % (cmdtxt, opttxt, )) + if cmd == NOP: + self.sendcommand(NOP) + elif cmd == WILL or cmd == WONT: + if opt in self.WILLACK: + self.sendcommand(self.WILLACK[opt], opt) + else: + self.sendcommand(DONT, opt) + if cmd == WILL and opt == TTYPE: + self.writecooked(IAC + SB + TTYPE + SEND + IAC + SE) + elif cmd == DO or cmd == DONT: + if opt in self.DOACK: + self.sendcommand(self.DOACK[opt], opt) + else: + self.sendcommand(WONT, opt) + if opt == ECHO: + self.DOECHO = (cmd == DO) + elif cmd == SE: + subreq = self.read_sb_data() + if subreq[0] == TTYPE and subreq[1] == IS: + try: + self.setterm(subreq[2:]) + except: + logging.debug("Terminal type not known") + elif cmd == SB: + pass + else: + logging.debug("Unhandled option: %s %s" % (cmdtxt, opttxt, )) + + def sendcommand(self, cmd, opt=None): + "Send a telnet command (IAC)" +# if CMDS.has_key(cmd): +# cmdtxt = CMDS[cmd] +# else: +# cmdtxt = "cmd:%d" % ord(cmd) +# if opt == None: +# opttxt = '' +# else: +# if CMDS.has_key(opt): +# opttxt = CMDS[opt] +# else: +# opttxt = "opt:%d" % ord(opt) + if cmd in [DO, DONT]: + if opt not in self.DOOPTS: + self.DOOPTS[opt] = None + if (((cmd == DO) and (self.DOOPTS[opt] != True)) + or ((cmd == DONT) and (self.DOOPTS[opt] != False))): +# logging.debug("Sending %s %s" % (cmdtxt, opttxt, )) + self.DOOPTS[opt] = (cmd == DO) + self.writecooked(IAC + cmd + opt) +# else: +# logging.debug("Not resending %s %s" % (cmdtxt, opttxt, )) + elif cmd in [WILL, WONT]: + if opt not in self.WILLOPTS: + self.WILLOPTS[opt] = '' + if (((cmd == WILL) and (self.WILLOPTS[opt] != True)) + or ((cmd == WONT) and (self.WILLOPTS[opt] != False))): +# logging.debug("Sending %s %s" % (cmdtxt, opttxt, )) + self.WILLOPTS[opt] = (cmd == WILL) + self.writecooked(IAC + cmd + opt) +# else: +# logging.debug("Not resending %s %s" % (cmdtxt, opttxt, )) + else: + self.writecooked(IAC + cmd) + + def read_sb_data(self): + """Return any data available in the SB ... SE queue. + + Return '' if no SB ... SE available. Should only be called + after seeing a SB or SE command. When a new SB command is + found, old unread SB data will be discarded. Don't block. + + """ + buf = self.sbdataq + self.sbdataq = '' + return buf + +# ---------------------------- Input Functions ----------------------------- + + def _readline_echo(self, char, echo): + """Echo a recieved character, move cursor etc...""" + if echo == True or (echo == None and self.DOECHO == True): + self.write(char) + + def readline(self, echo=False): + """Return a line of text, including the terminating LF + If echo is true always echo, if echo is false never echo + If echo is None follow the negotiated setting. + """ + line = [] + insptr = 0 + histptr = len(self.history) + while True: + c = self.getc(block=True) + if c == theNULL: + continue + elif c == curses.KEY_LEFT: + if insptr > 0: + insptr = insptr - 1 + self._readline_echo(self.CODES['CSRLEFT'], echo) + else: + self._readline_echo(chr(7), echo) + continue + elif c == curses.KEY_RIGHT: + if insptr < len(line): + insptr = insptr + 1 + self._readline_echo(self.CODES['CSRRIGHT'], echo) + else: + self._readline_echo(chr(7), echo) + continue + elif c == curses.KEY_UP or c == curses.KEY_DOWN: + if c == curses.KEY_UP: + if histptr > 0: + histptr = histptr - 1 + else: + self._readline_echo(chr(7), echo) + continue + elif c == curses.KEY_DOWN: + if histptr < len(self.history): + histptr = histptr + 1 + else: + self._readline_echo(chr(7), echo) + continue + line = [] + if histptr < len(self.history): + line.extend(self.history[histptr]) + for char in range(insptr): + self._readline_echo(self.CODES['CSRLEFT'], echo) + self._readline_echo(self.CODES['DEOL'], echo) + self._readline_echo(''.join(line), echo) + insptr = len(line) + continue + elif c == chr(3): + self._readline_echo('\n' + curses.ascii.unctrl(c) + ' ABORT\n', echo) + return '' + elif c == chr(4): + if len(line) > 0: + self._readline_echo('\n' + curses.ascii.unctrl(c) + ' ABORT (QUIT)\n', echo) + return '' + self._readline_echo('\n' + curses.ascii.unctrl(c) + ' QUIT\n', echo) + return 'QUIT' + elif c == chr(10): + self._readline_echo(c, echo) + if echo == True or (echo == None and self.DOECHO == True): + self.history.append(line) + return ''.join(line) + elif c == curses.KEY_BACKSPACE or c == chr(127) or c == chr(8): + if insptr > 0: + self._readline_echo(self.CODES['CSRLEFT'] + self.CODES['DEL'], echo) + insptr = insptr - 1 + del line[insptr] + else: + self._readline_echo(chr(7), echo) + continue + elif c == curses.KEY_DC: + if insptr < len(line): + self._readline_echo(self.CODES['DEL'], echo) + del line[insptr] + else: + self._readline_echo(chr(7), echo) + continue + else: + if ord(c) < 32: + c = curses.ascii.unctrl(c) + self._readline_echo(c, echo) + line[insptr:insptr] = c + insptr = insptr + len(c) + + def getc(self, block=True): + """Return one character from the input queue""" + if not block: + if not len(self.cookedq): + return '' + while not len(self.cookedq): + time.sleep(0.05) + self.IQUEUELOCK.acquire() + ret = self.cookedq[0] + self.cookedq = self.cookedq[1:] + self.IQUEUELOCK.release() + return ret + +# --------------------------- Output Functions ----------------------------- + + def writeline(self, text): + """Send a packet with line ending.""" + self.write(text+chr(10)) + + def write(self, text): + """Send a packet to the socket. This function cooks output.""" + text = text.replace(IAC, IAC+IAC) + text = text.replace(chr(10), chr(13)+chr(10)) + self.writecooked(text) + + def writecooked(self, text): + """Put data directly into the output queue (bypass output cooker)""" + self.OQUEUELOCK.acquire() + self.sock.sendall(text.encode('utf-8')) + self.OQUEUELOCK.release() + +# ------------------------------- Input Cooker ----------------------------- + + def _inputcooker_getc(self, block=True): + """Get one character from the raw queue. Optionally blocking. + Raise EOFError on end of stream. SHOULD ONLY BE CALLED FROM THE + INPUT COOKER.""" + if self.rawq: + ret = self.rawq[0] + self.rawq = self.rawq[1:] + return ret + if not block: + if select.select([self.sock.fileno()], [], [], 0) == ([], [], []): + return '' + ret = self.sock.recv(20) + self.eof = not(ret) + if (len(ret) > 0 and ret[0] == 255): + raise EOFError + self.rawq = self.rawq + ret.decode('utf-8') + if self.eof: + raise EOFError + return self._inputcooker_getc(block) + + def _inputcooker_ungetc(self, char): + """Put characters back onto the head of the rawq. SHOULD ONLY + BE CALLED FROM THE INPUT COOKER.""" + self.rawq = char + self.rawq + + def _inputcooker_store(self, char): + """Put the cooked data in the correct queue (with locking)""" + if self.sb: + self.sbdataq = self.sbdataq + char + else: + self.IQUEUELOCK.acquire() + if type(char) in [type(()), type([]), type("")]: + for v in char: + self.cookedq.append(v) + else: + self.cookedq.append(char) + self.IQUEUELOCK.release() + + def inputcooker(self): + """Input Cooker - Transfer from raw queue to cooked queue. + + Set self.eof when connection is closed. Don't block unless in + the midst of an IAC sequence. + """ + try: + while True: + c = self._inputcooker_getc() + if not self.iacseq: + if c == IAC: + self.iacseq += c + continue + elif c == chr(13) and not(self.sb): + c2 = self._inputcooker_getc(block=False) + if c2 == theNULL or c2 == '': + c = chr(10) + elif c2 == chr(10): + c = c2 + else: + self._inputcooker_ungetc(c2) + c = chr(10) + elif c in [x[0] for x in self.ESCSEQ.keys()]: + 'Looks like the begining of a key sequence' + codes = c + for keyseq in self.ESCSEQ.keys(): + if len(keyseq) == 0: + continue + while codes == keyseq[:len(codes)] and len(codes) <= keyseq: + if codes == keyseq: + c = self.ESCSEQ[keyseq] + break + codes = codes + self._inputcooker_getc() + if codes == keyseq: + break + self._inputcooker_ungetc(codes[1:]) + codes = codes[0] + self._inputcooker_store(c) + elif len(self.iacseq) == 1: + 'IAC: IAC CMD [OPTION only for WILL/WONT/DO/DONT]' + if c in (DO, DONT, WILL, WONT): + self.iacseq += c + continue + self.iacseq = '' + if c == IAC: + self._inputcooker_store(c) + else: + if c == SB: # SB ... SE start. + self.sb = 1 + self.sbdataq = '' + # continue + elif c == SE: # SB ... SE end. + self.sb = 0 + # Callback is supposed to look into + # the sbdataq + self.options_handler(self.sock, c, NOOPT) + elif len(self.iacseq) == 2: + cmd = self.iacseq[1] + self.iacseq = '' + if cmd in (DO, DONT, WILL, WONT): + self.options_handler(self.sock, cmd, c) + except EOFError: + #self.finish() + pass + +# ------------------------------- Basic Commands --------------------------- + +# Format of docstrings for command methods: +# Line 0: Command paramater(s) if any. (Can be blank line) +# Line 1: Short descriptive text. (Mandatory) +# Line 2+: Long descriptive text. (Can be blank line) + + def cmdHELP(self, params): + """[] + Display help + Display either brief help on all commands, or detailed + help on a single command passed as a parameter. + """ + if params: + cmd = params[0].upper() + if cmd in self.COMMANDS: + method = self.COMMANDS[cmd] + doc = method.__doc__.split("\n") + docp = doc[0].strip() + docl = '\n'.join(doc[2:]).replace("\n\t\t", " ").replace("\t", "").strip() + if len(docl) < 4: + docl = doc[1].strip() + self.writeline( + "%s %s\n\n%s" % ( + cmd, + docp, + docl, + ) + ) + return + else: + self.writeline("Command '%s' not known" % cmd) + else: + self.writeline("Help on built in commands\n") + keys = list(self.COMMANDS.keys()) + keys.sort() + for cmd in keys: + method = self.COMMANDS[cmd] + doc = method.__doc__.split("\n") + docp = doc[0].strip() + docs = doc[1].strip() + if len(docp) > 0: + docps = "%s - %s" % (docp, docs, ) + else: + docps = "- %s" % (docs, ) + self.writeline( + "%s %s" % ( + cmd, + docps, + ) + ) + cmdHELP.aliases = ['?'] + + def cmdEXIT(self, params): + """ + Exit the command shell + """ + self.RUNSHELL = False + self.writeline("Goodbye") + cmdEXIT.aliases = ['QUIT', 'BYE', 'LOGOUT'] + + def cmdDEBUG(self, params): + """ + Display some debugging data + """ + for (v,k) in self.ESCSEQ.items(): + line = '%-10s : ' % (self.KEYS[k], ) + for c in v: + if ord(c)<32 or ord(c)>126: + line = line + curses.ascii.unctrl(c) + else: + line = line + c + self.writeline(line) + + def cmdHISTORY(self, params): + """ + Display the command history + """ + cnt = 0 + self.writeline('Command history\n') + for line in self.history: + cnt = cnt + 1 + self.writeline("%-5d : %s" % (cnt, ''.join(line))) + +# ----------------------- Command Line Processor Engine -------------------- + + def handleException(self, exc_type, exc_param, exc_tb): + "Exception handler (False to abort)" + self.writeline(traceback.format_exception_only(exc_type, exc_param)[-1]) + return True + + def handle(self): + "The actual service to which the user has connected." + username = None + password = None + if self.authCallback: + if self.authNeedUser: + if self.DOECHO: + self.write("Username: ") + username = self.readline() + if self.authNeedPass: + if self.DOECHO: + self.write("Password: ") + password = self.readline(echo=False) + if self.DOECHO: + self.write("\n") + try: + self.authCallback(username, password) + except: + return + while self.RUNSHELL: + if self.DOECHO: + self.write(self.PROMPT) + cmdlist = [item.strip() for item in self.readline().split()] + idx = 0 + while idx < (len(cmdlist) - 1): + if cmdlist[idx][0] in ["'", '"']: + cmdlist[idx] = cmdlist[idx] + " " + cmdlist.pop(idx+1) + if cmdlist[idx][0] != cmdlist[idx][-1]: + continue + cmdlist[idx] = cmdlist[idx][1:-1] + idx = idx + 1 + if cmdlist: + cmd = cmdlist[0].upper() + params = cmdlist[1:] + if cmd in self.COMMANDS: + try: + self.COMMANDS[cmd](params) + except: + (t, p, tb) = sys.exc_info() + if self.handleException(t, p, tb): + break + else: + self.write("Unknown command '%s'\n" % cmd) + logging.debug("Exiting handler") + +if __name__ == '__main__': + "Testing - Accept a single connection" + class TNS(socketserver.TCPServer): + allow_reuse_address = True + + class TNH(TelnetHandler): + def cmdECHO(self, params): + """ [ ...] + Echo parameters + Echo command line parameters back to user, one per line. + """ + self.writeline("Parameters:") + for item in params: + self.writeline("\t%s" % item) + + logging.getLogger('').setLevel(logging.DEBUG) + + tns = TNS(("0.0.0.0", 8023), TNH) + tns.serve_forever() + +# vim: set syntax=python ai showmatch: + diff --git a/lustre/tests/lutf/python/tests-infra/clustre_reset.py b/lustre/tests/lutf/python/tests-infra/clustre_reset.py new file mode 100644 index 0000000..0ee03d9 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/clustre_reset.py @@ -0,0 +1,6 @@ +from lustre_cleanup import clean_lustre +from lnet_cleanup import clean_lnet + +def lutf_clean_setup(): + clean_lustre() + clean_lnet() diff --git a/lustre/tests/lutf/python/tests-infra/fio.py b/lustre/tests/lutf/python/tests-infra/fio.py new file mode 100644 index 0000000..2e63aee --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/fio.py @@ -0,0 +1,157 @@ +import os, yaml, random, sys, re, csv, signal, pathlib, shutil +from lutf_basetest import BaseTest, lutfrc +from lutf_exception import LUTFError +import selftest_template as st +from clutf_global import * +from lutf_cmd import lutf_exec_local_cmd +import threading, logging + +fiocfg=['[global]\n', + 'ioengine=%s\n', + 'rw=%s\n', + 'blocksize=%s\n', + 'iodepth=1\n', + 'direct=%d\n', + 'size=%s\n', + 'numjobs=%d\n', + 'group_reporting\n', + 'thread\n', + 'time_based=1\n', + 'runtime=%d\n'] + +fiocpu=['[md%d]\n', + 'directory=%s\n', + 'filename_format=f.$jobnum.$filenum\n'] + +fiogpu=['[md%d]\n', + 'directory=%s\n', + 'filename_format=f.$jobnum.$filenum\n', + 'gpu_dev_id=%d\n'] + +class FioTraffic(BaseTest): + def __init__(self, script=None, target=None, exception=True): + super().__init__(script, target) + self.__exception = exception + self.__fio_cmd = shutil.which('fio') + self.__fio_cfg = None + self.__csv_file = None + self.__csv_writer = None + self.__fieldnames = ['load', 'file name', 'operation', 'file size', 'block size', 'num jobs', + 'duration', 'BW', 'IO', 'cpu usr', 'cpu sys', 'cpu ctx/job', 'cpu ctx', + 'cpu majf', 'cpu minf'] + self.__data = {} + self.__traffic_thread = None + + # Takes a configuration dictionary + def load(self, directory, ioengine='sync', rw='write', blocksize='1M', direct=0, size='512M', numjobs=4, runtime=30, gpu=None, cfg={}): + global fiocfg + global fiocpu + global fiogpu + lfiocfg = fiocfg + + if not self.__fio_cmd: + if self.__exception: + raise LUTFError("fio not available on this machine") + return False, "fio not available on this machine" + + if 'csv' in cfg: + self.__csv_file = open(cfg['csv'], 'w') + self.__csv_writer = csv.DictWriter(csvfile, fieldnames=self.__fieldnames) + self.__csv_writer.writeheader() + + load = 'CPU' + if gpu: + load = 'GPU' + lfiocfg.append('mem=cudamalloc\n') + if len(gpu) != numjobs: + if self.__exception: + raise LUTFError("Mismatched configuration: gpu %d numjobs %d" % (gpu, numjobs)) + return False, "Mismatched configuration: gpu %d numjobs %d" % (gpu, numjobs) + fileblk = fiogpu + else: + fileblk = fiocpu + + self.__fio_cfg = ''.join(lfiocfg) % (ioengine, rw, blocksize, direct, size, numjobs, runtime) + + for i in range(0, numjobs): + subdir = os.path.join(directory, 'md'+str(i)) + pathlib.Path(subdir).mkdir(parents=True, exist_ok=True) + if gpu: + self.__fio_cfg += ''.join(fileblk) % (i, subdir, gpu[i]) + else: + self.__fio_cfg += ''.join(fileblk) % (i, subdir) + + self.__data['load'] = load + self.__data['operation'] = rw + self.__data['filesize'] = size + self.__data['blocksize'] = blocksize + self.__data['numjobs'] = numjobs + self.__data['duration'] = runtime + + def start(self): + if not self.__fio_cfg: + if self.__exception: + raise LUTFError("Failed to run traffic. No configuration") + else: + return False, self.__fio_cfg + + rc = self.__run_fio() + + if self.__csv_writer: + self.__record(rc) + + def stop(self): + if not self.__traffic_thread: + logging.debug("No traffic running to stop") + return + self.__traffic_thread.join() + self.__traffic_thread = None + return + + def __record(self, rc): + if self.__data['operation'] == 'write': + m = re.search('WRITE: bw=(.+?) \((.+?)\), (.+?)-(.+?) \((.+?)-(.+?)\), io=(.+?) \((.+?)\), run=(.+?)-(.+?)msec', + rc[0].decode("utf-8")) + else: + m = re.search('READ: bw=(.+?) \((.+?)\), (.+?)-(.+?) \((.+?)-(.+?)\), io=(.+?) \((.+?)\), run=(.+?)-(.+?)msec', + rc[0].decode("utf-8")) + # get the cpu information + # cpu : usr=0.02%, sys=8.27%, ctx=161380, majf=0, minf=382 + m1 = re.search("cpu(.+?): usr=(.+?), sys=(.+?), ctx=(.+?), majf=(.+?), minf=(.+?)", rc[0].decode("utf-8")) + + ctx_switches = int(int(m1[4]) / self.__data['numjobs']) + self.__csv_writer.writerow({'load': self.__data['load'], 'operation': self.__data['operation'], + 'file size': self.__data['filesize'], 'block size': self.__data['blocksize'], + 'num jobs': self.__data['numjobs'], 'duration': self.__data['duration'], + 'BW': m[2], 'IO': m[8], 'cpu usr': m1[2], 'cpu sys': m1[3], 'cpu ctx/job': str(ctx_switches), + 'cpu ctx': m1[4], 'cpu majf': m1[5], 'cpu minf': m1[6]}) + + def __run_traffic(self, cmd): + return lutf_exec_local_cmd(cmd) + + def __run_fio(self): + if self.__traffic_thread: + logging.debug("Traffic is already running") + return + + cmd_fmt = "%s %s" + + local_path = get_lutf_tmp_dir() + pathlib.Path(local_path).mkdir(parents=True, exist_ok=True) + + fname = os.path.join(local_path, "lutf_fio_"+str(random.getrandbits(32)) + '.cfg') + + f = open(fname, 'w') + f.write(self.__fio_cfg) + f.close() + + cmd = cmd_fmt % (self.__fio_cmd, fname) + + self.__traffic_thread = threading.Thread(target=self.__run_traffic, args=(cmd,)) + self.__traffic_thread.start() + + def unload(self): + if self.__csv_file: + self.__csv_file.close() + self.stop() + diff --git a/lustre/tests/lutf/python/tests-infra/lnet.py b/lustre/tests/lutf/python/tests-infra/lnet.py new file mode 100644 index 0000000..7022671 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lnet.py @@ -0,0 +1,816 @@ +from pathlib import Path +import os, shlex, yaml, subprocess, random, time, logging +from lutf_exception import LUTFError, LutfDumper +from lutf_cmd import lutf_exec_local_cmd +from clutf_global import * +from lutf_basetest import BaseTest, lutfrc +from utility_paths import get_lnetctl, lustre_rmmod, LSMOD, MOUNT, load_lnet +import lutf +import lnetconfig + +# Collection() +# Parent class which provides method to keep track of the different elements +# in a dictionary. Provides method to iterate and set items, much like a +# dictionary: +# ex: dict['key'] = value +class Collection: + def __init__(self, typeof): + self.__typeof = typeof + self.__test_db = {} + self.__max = 0 + self.__n = 0 + + def __getitem__(self, key): + try: + rc = self.__test_db[key] + except: + raise ValueError('no entry for', key) + return rc + + def __setitem__(self, key, value): + self.__test_db[key] = self.__typeof(key, value) + self.__max = len(self.__test_db) + + def __iter__(self): + self.__n = 0 + return self + + def __next__(self): + if self.__n < self.__max: + key = list(self.__test_db.keys())[self.__n] + suite = self.__test_db[key] + self.__n += 1 + return key, suite + raise StopIteration + + def __len__(self): + return len(list(self.__test_db.keys())) + + def __contains__(self, key): + return True if key in self.__test_db.keys() else False + + #TODO requires further testing + def __eq__(self, c1): + for k, v in self.__test_db.items(): + if k not in c1: + return False + if v == c1[k]: + continue + else: + return False + return True + + def __ne__(self, c1): + for k, v in self.__test_db.items(): + if k in c1: + return False + return True + + def update(self, key, value): + self.__test_db[key] = self.__typeof(key, value, user=False) + self.__max = len(self.__test_db) + + def dump(self): + config = '' + for k, v in self.__test_db.items(): + config += v.dump() + return config + + def list_nids(self, match=None): + nids = [] + list_nidsd = {} + for k, v in self.__test_db.items(): + key = None + try: + key = getattr(v, 'list_nids_key')() + except: + pass + nid = v.list_nids(match) + if not nid: + continue + if key: + if key not in list_nidsd: + list_nidsd[key] = [] + if type(nid) is list: + list_nidsd[key] = list_nidsd[key] + nid + list_nidsd[key] = [y for x in list_nidsd[key] for y in x] + else: + list_nidsd[key].append(nid) + elif type(nid) is list: + for n in nid: + nids.append(n) + elif type(nid) is str: + nids.append(nid) + if len(list_nidsd) > 0: + nids.append(list_nidsd) + return nids + + def get(self): + config = [] + for k, v in self.__test_db.items(): + info = v.get() + if type(info) is list: + config = config + info + else: + config.append(v.get()) + return config + + def keys(self): + return list(self.__test_db.keys()) + + def values(self): + return list(self.__test_db.values()) + +# L.nets['tcp'] = [{'interfaces': 'eth0', 'peer_credits': 128}, +# {'interfaces': 'eth1', 'peer_credits': 128}] +class LNetNI: + def __init__(self, name, ni_def, user=True): + self.__info = {} + self.name = name + self.__nid = '' + self.__net = '' + if user: + self.populate(ni_def) + else: + self.populate_predef(ni_def) + + def __eq__(self, ni): + remote = ni.get() + for k, v in self.__info.items(): + if k in remote: + # check if they match + if not v == remote[k]: + return False + return True + + def populate(self, ni_def): + if 'tunables' in self.__info: + tunables = self.__info['tunables'] + else: + tunables = {} + if 'net' in ni_def: + self.__net = ni_def['net'] + del(ni_def['net']) + for k, v in ni_def.items(): + if k == 'CPT': + self.__info[k] == v.replace(',', ' ').split() + elif k == 'interfaces': + intf = v.replace(',', ' ').split() + intfd = {} + j = 0 + for i in intf: + if j == 0: + self.__nid = lutf.me.get_local_interface_ip(i)+'@'+self.__net + intfd[j] = i + self.__info[k] = intfd + elif k == 'credits' or k == 'peer_buffer_credits' or k == 'peer_credits' or k == 'peer_timeout': + tunables[k] = v + self.__info['tunables'] = tunables + else: + raise ValueError(k, 'unexpected keyword') + + def dump(self): + return yaml.dump(self.__info, Dumper=LutfDumper, indent=2, sort_keys=False) + + def list_nids(self, match=None): + if match and match != self.__net: + return None + if len(self.__nid) == 0: + return None + return self.__nid + + def list_nids_key(self): + return self.__net + + def get(self): + return self.__info + + def populate_predef(self, info): + if 'net' in info: + self.__net = info['net'] + self.__nid = lutf.me.get_local_interface_ip(info['interfaces'][0])+'@'+self.__net + del(info['net']) + self.__info = info + +class LNetNetNICol(Collection): + def __init__(self): + super().__init__(LNetNI) + +class LNetNet: + def __init__(self, name, args, user=True): + self.name = name + self.nis = LNetNetNICol() + if user: + for e in args: + if 'net' not in e: + e['net'] = self.name + self.nis[e['interfaces']] = e + else: + for e in args: + if 'net' not in e: + e['net'] = self.name + if 'interfaces' in e: + self.nis.update(e['interfaces'][0], e) + + def __eq__(self, net): + return self.nis == net.nis + + def list_nids(self, match=None): + if match and match != self.name: + return None + return self.nis.list_nids() + + def get(self): + config = {'net type':self.name, 'local NI(s)':self.nis.get()} + return config + + def dump(self): + config = {'net type':self.name, 'local NI(s)':self.nis.get()} + return yaml.dump(config, Dumper=LutfDumper, indent=2, sort_keys=False) + +class LNetNetCol(Collection): + def __init__(self): + super().__init__(LNetNet) + +# L.peers['192.168.29.3@tcp'] = {'Multi-Rail': True, 'peer ni': +# [{'nid': '192.168.29.3@tcp'}, +# {'nid': '192.168.29.4@tcp1'}]} +class LNetPeerNI: + def __init__(self, nid, ni_def, user=True): + self.__info = {} + self.__nid = nid + self.__prim_nid = 'undef' + if 'primary-nid' in ni_def: + self.__prim_nid = ni_def['primary-nid'] + del(ni_def['primary-nid']) + self.populate(ni_def) + + def populate(self, ni_def): + self.__info['nid'] = ni_def['nid'] + + def __eq__(self, ni): + remote = ni.get() + for k, v in self.__info.items(): + if k in remote: + if not v == remote[k]: + return False + return True + + def dump(self): + return yaml.dump(self.__info, Dumper=LutfDumper, indent=2, sort_keys=False) + + def list_nids(self, match=None): + if match and match != self.__prim_nid: + return None + return self.__nid + + def list_nids_key(self): + return self.__prim_nid + + def get(self): + return self.__info + + def populate_predef(self, info): + if 'primary-nid' in info: + self.__prim_nid = info['primary-nid'] + del(info['primary-nid']) + self.__info = info + +class LNetPeerNICol(Collection): + def __init__(self): + super().__init__(LNetPeerNI) + +class LNetPeer: + def __init__(self, primary_nid, args, user=True): + self.primary_nid = primary_nid + self.multi_rail = args['Multi-Rail'] + self.peer_nis = LNetPeerNICol() + for e in args['peer ni']: + e['primary-nid'] = self.primary_nid + self.peer_nis[e['nid']] = e + + def __eq__(self, peer): + return self.get() == peer.get() + + def list_nids(self, match=None): + if match and match != self.primary_nid: + return None + return self.peer_nis.list_nids() + + def dump(self): + c = self.get() + return yaml.dump(c, Dumper=LutfDumper, indent=2, sort_keys=False) + + def get(self): + config = {'primary nid':self.primary_nid, + 'Multi-Rail': self.multi_rail, + 'peer ni':self.peer_nis.get()} + return config + +class LNetPeerCol(Collection): + def __init__(self): + super().__init__(LNetPeer) + +# L.routes['tcp4'] = [{'gateway': '192.168.2.4@tcp', 'hop': -1, 'priority': 0}, +# {'gateway': '192.168.4.4@tcp', 'hop': 3, 'priority': 5}] +class LNetGateway: + def __init__(self, nid, gw): + self.__info = {} + self.__nid = nid + self.__rnet = 'undef' + if 'rnet' in gw: + self.__rnet = gw['rnet'] + del(gw['rnet']) + self.populate(gw) + + def populate(self, gw): + self.__info['gateway'] = gw['gateway'] + if 'hop' in gw: + self.__info['hop'] = gw['hop'] + if 'priority' in gw: + self.__info['priority'] = gw['priority'] + if 'health_sensitivity' in gw: + self.__info['health_sensitivity'] = gw['health_sensitivity'] + + def __eq__(self, gw): + remote = gw.get() + for k, v in self.__info.items(): + if k in remote: + if not v == remote[k]: + return False + return True + + def list_nids(self, match=None): + if match and match != self.__rnet: + return None + return self.__nid + + def dump(self): + return yaml.dump(self.__info, Dumper=LutfDumper, indent=2, sort_keys=False) + + def get(self): + return self.__info + + def populate_yaml(self, y): + self.__info = yaml.load(y, Loader=(yaml.FullLoader)) + +class LNetGatewayCol(Collection): + def __init__(self): + super().__init__(LNetGateway) + +class LNetRNet: + def __init__(self, net, args, user=True): + self.net = net + self.gateways = LNetGatewayCol() + if user: + for e in args: + e['rnet'] = self.net + self.gateways[e['gateway']] = e + else: + args['rnet'] = self.net + self.gateways[args['gateway']] = args + + def __eq__(self, rnet): + me = self.get() + other = rnet.get() + if len(me) != len(other): + return False + for entry in me: + if entry not in other: + return False + return True + + def append(self, args): + args['rnet'] = self.net + self.gateways[args['gateway']] = args + + def list_nids(self, match=None): + if match and match != self.net: + return None + nids = [] + for gw in self.gateways.get(): + nids.append(gw.list_nids()) + return nids + + def list_nids_key(self): + return self.net + + def dump(self): + return yaml.dump(self.get(), Dumper=LutfDumper, indent=2, sort_keys=False) + dumps = [] + for gw in self.gateways.get(): + dumps.append(gw.dump()) + return ''.join(dumps) + #return self.gateways.dump() + + def get(self): + config = [] + for gw in self.gateways.get(): + rnet = {'net': self.net} + rnet.update(gw) + config.append(rnet) + return config + +class LNetRNetCol(Collection): + def __init__(self): + super().__init__(LNetRNet) + +class TheLNet(BaseTest): + ''' + Usage: + ====== + import sys + import lnet + L = lnet.TheLNet() + L.nets['tcp'] = [{'interfaces': 'eth0', 'peer_credits': 128}, + {'interfaces': 'eth1', 'peer_credits': 128}] + L.routes['tcp3'] = [{'gateway': '192.168.2.4@tcp', 'hop': -1, 'priority': 0}, + {'gateway': '192.168.4.4@tcp', 'hop': 3, 'priority': 5}] + L.routes['tcp4'] = [{'gateway': '192.168.2.4@tcp', 'hop': -1, 'priority': 0}, + {'gateway': '192.168.4.4@tcp', 'hop': 3, 'priority': 5}] + L.peers['192.168.29.3@tcp'] = {'Multi-Rail': True, 'peer ni': + [{'nid': '192.168.29.3@tcp'}, + {'nid': '192.168.29.4@tcp1'}]} + L.routes['tcp3'] = [{'gateway': '192.168.2.4@tcp', 'hop': -1, 'priority': 0}] + L.routing = {'enable': 1} + L.buffers = {'tiny': 2048, 'small': 16384, 'large': 1024} + L.global_vars = {'numa_range': 0, 'max_intf': 200, 'discovery': 1, 'drop_asym_route': 0} + L.configure() + + # Build up the already configured lnet. + + L = lnet.TheLNet() + L.update() + + # one potential use is: + # 1. Configure the LNet as in the first example + # 2. Then read back the configuration into a different TheLNet instance + # 3. compare + + L.configure() + L1.update() + if L != L1: + print("Failed to configure") + L1.unconfigure() + ''' + def __init__(self, script=os.path.abspath(__file__), + target=None, exceptions=True): + super().__init__(script, target=target) + # only initialize the data if we're going to execute locally + if target and lutf.me.name != target: + return + self.nets = LNetNetCol() + self.peers = LNetPeerCol() + self.routes = LNetRNetCol() + self.routing = {} + self.buffers = {} + self.global_vars = {} + if not self.check_lnet_loaded(): + load_lnet() + + def __eq__(self, L1): + if self.nets == L1.nets and \ + self.peers == L1.peers and \ + self.routes == L1.routes and \ + self.routing == L1.routing and \ + self.buffers == L1.buffers and \ + self.global_vars == L1.global_vars: + return True + return False + + def __ne__(self, L1): + if self.nets != L1.nets and \ + self.peers != L1.peers and \ + self.routes != L1.routes and \ + self.routing != L1.routing and \ + self.buffers != L1.buffers and \ + self.global_vars != L1.global_vars: + return True + return False + + def check_lnet_loaded(self): + rc = lutf_exec_local_cmd(LSMOD) + if not rc: + return False + if 'lnet' in rc[0].decode('utf-8'): + return True + else: + return False + + def export(self, op=print): + self.export_nets(op) + self.export_peers(op) + self.export_routes(op) + self.export_routing(op) + self.export_global(op) + + def export_nets(self, op=print, net=None): + if net: + try: + net_config = {'net': self.nets[net].get()} + except: + return + else: + net_config = {'net': self.nets.get()} + if len(net_config['net']) and op: + op(yaml.dump(net_config, Dumper=LutfDumper, indent=2, + sort_keys=False)) + return net_config + + def export_peers(self, op=print): + peer_config = {'peer': self.peers.get()} + if len(peer_config['peer']) and op: + op(yaml.dump(peer_config, Dumper=LutfDumper, indent=2, + sort_keys=False)) + return peer_config + + def export_routes(self, op=print): + route_config = {'route': self.routes.get()} + if len(route_config['route']) and op: + op(yaml.dump(route_config, Dumper=LutfDumper, indent=2, + sort_keys=False)) + return route_config + + # TODO: Incomplete routing implementation + def export_routing(self, op=print): + routing = {} + buffers = {} + if len(self.routing): + routing = {'routing': self.routing} + if op: + op(yaml.dump(routing, Dumper=LutfDumper, indent=2, + sort_keys=False)) + if len(self.buffers): + buffers = {'buffers': self.buffers} + if op: + op(yaml.dump(buffers, Dumper=LutfDumper, indent=2, + sort_keys=False)) + return {**routing, **buffers} + + def export_global(self, op=print): + global_vars = {} + if len(self.global_vars): + global_vars = {'global': self.global_vars} + if op: + op(yaml.dump(global_vars, Dumper=LutfDumper, indent=2, + sort_keys=False)) + return global_vars + + def yaml_config_helper(self, export_op, lnetctl_cmd, keep=False, **args): + local_path = get_lutf_tmp_dir() + Path(local_path).mkdir(parents=True, exist_ok=True) + fname = os.path.join(local_path, "lnet_gen_"+str(random.getrandbits(32)) + '.yaml') + with open(fname, 'w') as f: + export_op(f.write, **args) + if not self.check_lnet_loaded(): + load_lnet() + lutf_exec_local_cmd(lnetctl_cmd+" "+fname) + if keep: + print("configuration file:", fname) + else: + os.remove(fname) + + def configure(self, keep=False): + ''' + Configure LNet with the information stored in this instance + ''' + if self.check_lnet_loaded(): + lutf_exec_local_cmd(get_lnetctl() + " export") + self.yaml_config_helper(self.export, get_lnetctl() + " import", keep=keep) + + def import_yaml(self, op=print, yaml_data=None): + if op and yaml_data: + op(yaml.dump(yaml_data, Dumper=LutfDumper, indent=2, + sort_keys=False)) + + def import_config(self, data): + self.yaml_config_helper(self.import_yaml, get_lnetctl() + " import", keep=False, yaml_data=data) + self.update() + + def import_del(self, data): + self.yaml_config_helper(self.import_yaml, get_lnetctl() + " import --del", keep=False, yaml_data=data) + self.update() + + def configure_yaml(self, yaml): + # update our internal structure + self.update(yaml) + # configure LNet + self.configure() + + def unconfigure(self): + ''' + Unconfigure LNet + ''' + if self.check_lnet_loaded(): + rc = lutf_exec_local_cmd(MOUNT) + output = rc[0].decode('utf-8') + if 'type lustre' in output: + raise LUTFError("Lustre is still mounted") + if not lustre_rmmod(): + lnetconfig.lustre_lnet_config_lib_uninit() + raise LUTFError("lustre_rmmod failed") + self.update() + + def unconfigure_net(self, net): + ''' + unconfigure the net specified + ''' + self.yaml_config_helper(self.export_nets, get_lnetctl() + " import --del", net=net) + self.update() + + def add_ni(self, net, dev): + ''' + add an NI + ''' + lutf_exec_local_cmd(get_lnetctl() + " net add --net %s --if %s" % (net, dev)) + self.update() + + def del_net(self, net): + ''' + delete an entire net + ''' + lutf_exec_local_cmd(get_lnetctl() + " net del --net " + net) + self.update() + + def del_ni(self, net, dev): + ''' + delete an interface + ''' + lutf_exec_local_cmd(get_lnetctl() + " net del --net %s --if %s" % (net, dev)) + self.update() + + def update(self, yamlcfg=None): + ''' + This method exports the LNet configuration using + lnetctl export --backup > config.yaml + It then parses that file and stores the data. + ''' + self.nets = LNetNetCol() + self.peers = LNetPeerCol() + self.routes = LNetRNetCol() + self.routing = {} + self.buffers = {} + self.global_vars = {} + + # if no lnet then just reset the internal data. + if not self.check_lnet_loaded(): + return + + if not yamlcfg: + rc = lutf_exec_local_cmd(get_lnetctl() + " export --backup") + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + else: + y = yamlcfg + # Test code + #with open('cfg.yaml', 'r') as f: + # y = yaml.load(f, Loader=yaml.FullLoader) + if 'net' in y: + for entry in y['net']: + # don't consider the loopback entry + if entry['net type'] == 'lo': + continue + self.nets.update(entry['net type'], entry['local NI(s)']) + else: + self.nets = LNetNetCol() + + if 'peer' in y: + for e in y['peer']: + self.peers.update(e['primary nid'], e) + else: + self.peers = LNetPeerCol() + + if 'route' in y: + for e in y['route']: + if e['net'] in self.routes: + self.routes[e['net']].append(e) + else: + self.routes.update(e['net'], e) + else: + self.routes = LNetRNetCol() + + if 'routing' in y: + self.routing = y['routing'] + else: + self.routing = {} + + if 'buffers' in y: + self.buffers = y['buffers'] + else: + self.buffers = {} + + if 'global' in y: + self.global_vars = y['global'] + else: + self.global_vars = {} + + def get_peers(self, nid=None): + if nid: + cmd = get_lnetctl() + " peer show --nid " + nid + else: + cmd = get_lnetctl() + " peer show" + rc = lutf_exec_local_cmd(cmd) + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def get_peers_detail(self, nid=None): + if nid: + cmd = get_lnetctl() + " peer show -v 4 --nid " + nid + else: + cmd = get_lnetctl() + " peer show -v 4" + rc = lutf_exec_local_cmd(cmd) + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def get_peer_stats(self, nid=None): + y = self.get_peers_detail(nid=nid) + for lp in y['peer']: + del(lp['Multi-Rail']) + del(lp['peer state']) + for lpni in lp['peer ni']: + del(lpni['state']) + del(lpni['max_ni_tx_credits']) + del(lpni['available_tx_credits']) + del(lpni['min_tx_credits']) + del(lpni['tx_q_num_of_buf']) + del(lpni['available_rtr_credits']) + del(lpni['min_rtr_credits']) + del(lpni['refcount']) + return y['peer'] + + def get_net(self, net=None): + if net and type(net) == str: + cmd = get_lnetctl() + " net show --net " + net + else: + cmd = get_lnetctl() + " net show" + rc = lutf_exec_local_cmd(cmd) + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def get_net_detail(self, net=None): + if net and type(net) == str: + cmd = get_lnetctl() + " net show -v 4 --net " + net + else: + cmd = get_lnetctl() + " net show -v 4" + rc = lutf_exec_local_cmd(cmd) + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def get_net_stats(self, net=None): + y = self.get_net_detail(net) + # delete the lo. It's always the first one + del(y['net'][0]) + for nt in y['net']: + for ni in nt['local NI(s)']: + del(ni['status']) + del(ni['interfaces']) + del(ni['tunables']) + return y['net'] + + def get_stats(self): + rc = lutf_exec_local_cmd(get_lnetctl() + " stats show") + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def get_global(self): + rc = lutf_exec_local_cmd(get_lnetctl() + " global show") + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def set_global_param(self, name, value): + rc = lutf_exec_local_cmd(get_lnetctl() + " set %s %d" % (name, value)) + + def get_config(self): + rc = lutf_exec_local_cmd(get_lnetctl() + " export --backup") + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def discover(self, nid): + logging.debug(get_lnetctl() + " discover %s" % nid) + rc = lutf_exec_local_cmd(get_lnetctl() + " discover %s" % nid) + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def ping(self, nid): + rc = lutf_exec_local_cmd(get_lnetctl() + " ping %s" % nid) + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + return y + + def make_net(self, interface_name=None, peer_credits=128, peer_timeout=180, + peer_buffer_credits=0, credits=256): + if not interface_name: + raise LUTFError("interface name for net not specified") + return {'interfaces': interface_name, 'peer_credits': peer_credits, + 'peer_timeout': peer_timeout, 'peer_buffer_credits': peer_buffer_credits, + 'credits': credits} + + def make_route(self, gw=None, hop=-1, priority=0, sen=1): + if not gw: + raise LUTFError("gateway not specified for route") + return {'gateway': gw, 'hop': hop, 'priority': priority, 'health_sensitivity': sen} + + def make_peer(self, mr=True, *peer_nis): + lpnis = [] + for e in peer_nis: + lpnis.append({'nid': e}) + return {'Multi-Rail': mr, 'peer_ni': lpnis} + diff --git a/lustre/tests/lutf/python/tests-infra/lnet_cleanup.py b/lustre/tests/lutf/python/tests-infra/lnet_cleanup.py new file mode 100644 index 0000000..f5d64c3 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lnet_cleanup.py @@ -0,0 +1,45 @@ +""" +1. Look through all the agents and determine the role each one has. IE: OSS, MDS +2. modprobe lnet +3. lnetctl lnet unconfigure +4. lustre_rmmod +""" + +import os, yaml, logging +import lutf_agent +from lutf_basetest import BaseTest +from lnet_selftest import LNetSelfTest +import lnet +from utility_paths import lustre_rmmod + +class LNetCleanup(BaseTest): + def __init__(self, target=None): + logging.critical("INSTANTIATING LNetCleanup for target " + str(target)) + super().__init__(os.path.abspath(__file__), + target=target) + + def unconfigure(self): + logging.critical("LNetCleanup::unconfigure() --> lustre_rmmod()") + lustre_rmmod() +# st = LNetSelfTest() +# try: +# st.unload() +# except: +# pass +# L = lnet.TheLNet() +# L.unconfigure() + +def clean_lnet(): + agents = lutf_agent.LutfAgents() + agent_list = agents.keys() + cleanups = [] + + logging.critical("Cleaning LNet for " + str(agent_list)) + + for a in agent_list: + cleanups.append(LNetCleanup(target=a)) + + for v in cleanups: + v.unconfigure() + + diff --git a/lustre/tests/lutf/python/tests-infra/lnet_helpers.py b/lustre/tests/lutf/python/tests-infra/lnet_helpers.py new file mode 100644 index 0000000..376c460 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lnet_helpers.py @@ -0,0 +1,731 @@ +import os, re +import yaml, ast, psutil +import lnetconfig, logging +from lutf import agents, me +from lutf_basetest import BaseTest, lutfrc +from lnet import TheLNet +from lutf_exception import LUTFError +from lutf_cmd import lutf_exec_local_cmd +from utility_paths import get_lnetctl, CAT, MAN + +LNET_NRB_TINY_MIN = 512 +LNET_NRB_TINY = LNET_NRB_TINY_MIN * 4 +LNET_NRB_SMALL_MIN = 4096 +LNET_NRB_SMALL = LNET_NRB_SMALL_MIN * 4 +LNET_NRB_LARGE_MIN = 256 +LNET_NRB_LARGE = LNET_NRB_LARGE_MIN * 4 + +class LNetHelpers(BaseTest): + def __init__(self, script=os.path.abspath(__file__), + target=None, exceptions=True): + super().__init__(script, target=target) + self.exceptions = exceptions + self.__nid = None + logging.debug('LNetHelpers: %s == %s' % (me.name, target)) + if not target or me.name == target: + logging.debug('Initializing LNetHelper') + rc = lnetconfig.lustre_lnet_config_lib_init() + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + raise LUTFError("Failed to initialize the liblnetconfig library") + + def __del__(self): + lnetconfig.lustre_lnet_config_lib_uninit() + super().__del__() + + def uninit(self): + logging.debug('uninit: Uninitializing LNetHelper') + lnetconfig.lustre_lnet_config_lib_uninit() + + def set_exception(self, exception): + self.exceptions = exception + + def get_mem_info(self): + return psutil.virtual_memory() + + def configure_lnet(self): + L = TheLNet() + + def unconfigure_lnet(self): + L = TheLNet() + L.unconfigure() + + def get_num_cpus(self): + return me.get_num_cpus() + + def configure_net(self, net, pintfs=None, pycfg=None): + L = TheLNet() + if pycfg: + L.configure_yaml(pycfg) + L1 = TheLNet() + L1.update() + if not L1.nets == L.nets and not net in L1.nets: + if self.exceptions: + raise LUTFError("LNet %s configuration failed" % net) + return False, net + return True, None + + if pintfs: + intfs = pintfs + else: + intfs = self.get_available_devs() + if len(intfs) == 0: + if self.exceptions: + raise LUTFError("node doesn't have any interfaces") + return False, net + # configure the first interface + nets = [] + for intf in intfs: + logging.debug("configuring: %s" % intf) + nets.append(L.make_net(intf)) + logging.debug(str(nets)) + L.nets[net] = nets + L.configure() + L1 = TheLNet() + L1.update() + if not L1.nets == L.nets and not net in L1.nets: + if self.exceptions: + raise LUTFError("LNet %s configuration failed" % net) + return False, net + net_show = L1.get_net() + for n in net_show['net']: + if n['net type'] == 'lo' or n['net type'] != net: + continue + self.__nid = n['local NI(s)'][0]['nid'] + break + + logging.debug(self.__nid) + return True, None + + def unconfigure_net(self, net): + if not net: + return True + L = TheLNet() + L.update() + L.unconfigure_net(net) + nets = L.export_nets(op=None) + for n in nets['net']: + if net == n['net type']: + if self.exceptions: + raise LUTFError("net %s was not unconfigure properly" % net) + return False + return True + + def get_nid(self): + return self.__nid + + def list_nids(self): + L = TheLNet() + nids = [] + net_show = L.get_net() + for n in net_show['net']: + if n['net type'] == 'lo': + continue + for nid in n['local NI(s)']: + nids.append(nid['nid']) + return nids + + def get_available_devs(self): + intfs = me.list_intfs() + return list(intfs['interfaces'].keys()) + + def get_available_intfs(self): + return me.list_intfs() + + def api_config_ni(self, net, device_list=[], global_cpts=None, ip2nets=None, + peer_credits=128, peer_timeout=180, peer_buffer_credits=0, + credits=256): + tunables = lnetconfig.lnet_ioctl_config_lnd_tunables() + tunables.lt_cmn.lct_peer_timeout = peer_timeout + tunables.lt_cmn.lct_peer_tx_credits = peer_credits; + tunables.lt_cmn.lct_max_tx_credits = credits; + tunables.lt_cmn.lct_peer_rtr_credits = peer_buffer_credits + + if (ip2nets == None): + nwd = lnetconfig.lnet_dlc_network_descr() + lnetconfig.lustre_lnet_init_nw_descr(nwd) + nwd.nw_id = lnetconfig.libcfs_str2net(net) + devices_str = '' + for device in device_list[:-1]: + devices_str += device + ',' + if len(device_list) > 0: + devices_str += device_list[-1] + rc = lnetconfig.lustre_lnet_parse_interfaces(devices_str, nwd) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("Failed to parse interfaces %d" % rc) + return False, [rc, net, device_list, global_cpts, ip2nets] + else: + nwd = None + g_cpts = None + if global_cpts != None and type(global_cpts) is list: + rc, g_cpts = lnetconfig.cfs_expr_list_parse(str(global_cpts), len(str(global_cpts)), 0, lnetconfig.UINT_MAX) + if rc != 0: + if self.exceptions: + raise LUTFError("Failed to parse global_cpts") + return False, [rc, net, device_list, global_cpts, ip2nets] + else: + g_cpts = None + rc, yaml_err = lnetconfig.lustre_lnet_config_ni(nwd, g_cpts, ip2nets, tunables, -1) + #Freeing the g_cpts causes a segmentation fault + #if g_cpts: + # lnetconfig.cfs_expr_list_free(g_cpts) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("Failed to config ni %s:%s:%s:%s" % + (str(net), str(device_list), + str(global_cpts), str(ip2nets))) + return False, [rc, net, device_list, global_cpts, ip2nets] + return True, [rc, net, device_list, global_cpts, ip2nets] + + def api_del_ni(self, net, device_list): + nwd = lnetconfig.lnet_dlc_network_descr() + lnetconfig.lustre_lnet_init_nw_descr(nwd) + nwd.nw_id = lnetconfig.libcfs_str2net(net) + devices_str = '' + for device in device_list[:-1]: + devices_str += device + ',' + if len(device_list) > 0: + devices_str += device_list[-1] + rc = lnetconfig.lustre_lnet_parse_interfaces(devices_str, nwd) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("Failed to parse interfaces") + return False, [rc, net, device_list] + rc, yaml_err = lnetconfig.lustre_lnet_del_ni(nwd, -1) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("Failed to del ni") + return False, [rc, net, device_list] + return True, [rc, net, device_list] + + def api_check_ni(self, net = None, num = 1, global_cpts=None, iname=None, peer_credits=128, + peer_timeout=180, peer_buffer_credits=0, credits=256): + rc, yaml_show, yaml_err = lnetconfig.lustre_lnet_show_net(net, 0, -1, False) + err = lnetconfig.cYAML_dump(yaml_err) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + self.cYAML_free(yaml_show) + if self.exceptions: + raise LUTFError("Failed to show NIs") + return False, [rc, num, err] + else: + # basic check to make sure there are the right number of nets + # configured + show = lnetconfig.cYAML_dump(yaml_show) + self.cYAML_free(yaml_show) + pyy = yaml.load(show, Loader=yaml.FullLoader) + count = len(pyy['net'][0]['local NI(s)']) + if pyy['net'][0]['net type'] != net or count != num: + if self.exceptions: + raise LUTFError("Show doesn't match %d != %d\n%s" % (count, num, show)) + return False, [rc, count, num, show] + # Check the tunables match + for n in pyy['net']: + if n['net type'] == net: + for i in n['local NI(s)']: + if iname and iname in list(i['interfaces'].values()): + if i['tunables']['peer_timeout'] != peer_timeout or \ + i['tunables']['peer_credits'] != peer_credits or \ + i['tunables']['peer_buffer_credits'] != peer_buffer_credits or \ + i['tunables']['credits'] != credits or \ + (global_cpts and ast.literal_eval(i['CPT']) != global_cpts): + if self.exceptions: + raise LUTFError("configured ni tunables don't match") + return False, [rc, count, num, show] + return True, [rc, show] + + def api_configure_route(self, rnet=None, gw=None, hop=-1, prio=0, sen=1): + rc, yaml_err = lnetconfig.lustre_lnet_config_route(rnet, gw, hop, prio, sen, -1) + self.cYAML_free(yaml_err) + if rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR: + if self.exceptions: + raise LUTFError("failed to configure route. rc=%s, rnet=%s gw=%s hop=%s pio=%s sen=%s" % (str(rc), str(rnet), str(gw), str(hop), str(prio), str(sen))) + return False, [rnet, gw, hop, prio, sen] + + # check the route was configured as expected + L1 = TheLNet() + L1.update() + L1.export(op=logging.debug) + if rnet not in L1.routes.keys(): + if self.exceptions: + raise LUTFError("failed to configure remote net %s" % rnet) + return False, [rnet, gw, hop, prio, sen] + route = L1.routes[rnet].get() + if len(route) == 0: + if self.exceptions: + raise LUTFError("failed to configure remote net %s" % rnet) + return False, [rnet, gw, hop, prio, sen] + logging.debug(yaml.dump({'original': [rnet, gw, hop, prio, sen], 'configured': route[0]})) + if route[0]['gateway'] != gw or \ + route[0]['hop'] != hop or \ + route[0]['priority'] != prio or \ + (route[0]['health_sensitivity'] != sen and sen != -1) or \ + (route[0]['health_sensitivity'] != 1 and sen == -1): + if self.exceptions: + raise LUTFError("Configured route is not expected", {'original': [rnet, gw, hop, prio, sen], 'configured': route[0]}) + return False, [[rnet, gw, hop, prio, sen], route[0]] + return True, [rnet, gw, hop, prio, sen] + + def cYAML_count(self, blk): + if (blk == None): + return 0 + yy = yaml.load(blk, Loader=yaml.FullLoader) + logging.debug(str(yy)) + return len(yy[next(iter(yy))]) + + def cYAML_free(self, cyaml): + if (cyaml): + lnetconfig.cYAML_free_tree(cyaml.cy_child) + + def api_del_route(self, rnet=None, gw=None): + # delete route but missing net + rc, yaml_err = lnetconfig.lustre_lnet_del_route(rnet, gw, -1) + self.cYAML_free(yaml_err) + if (rc == lnetconfig.LUSTRE_CFG_RC_MISSING_PARAM): + if self.exceptions: + raise LUTFError("Failed to delete route") + return False, [rc, rnet, gw] + return True, [rc, rnet, gw] + + def api_check_route(self, num, network = None, gateway = None, hop = -1, prio = -1): + logging.debug("show route: %s %s %s %s" % (str(network), str(gateway), str(hop), str(prio))) + rc, yaml_show, yaml_err = lnetconfig.lustre_lnet_show_route(network, gateway, hop, prio, 1, -1, False) + logging.debug("show_route: rc = %s" % (str(rc))) + self.cYAML_free(yaml_err) + if (rc == lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if yaml_show is None: + count = 0 + else: + show = lnetconfig.cYAML_dump(yaml_show) + count = self.cYAML_count(show) + logging.debug("%s Routes detected" % (str(count))) + logging.debug(show) + # free the memory (This is a call into the C code) + self.cYAML_free(yaml_show) + if (count != num): + if self.exceptions: + raise LUTFError("%d doesn't match number of configured routes %d" % (count, num)) + return False, [count, num, network, gateway, hop, prio] + elif num > 0: + self.cYAML_free(yaml_show) + if self.exceptions: + raise LUTFError("No routes configured") + return False, [count, num, network, gateway, hop, prio] + return True, None + + def api_config_rtr_buffers(self, tiny=-1, small=-1, large=-1): + rc, yaml_err = lnetconfig.lustre_lnet_config_buffers(tiny, small, large, -1) + err = lnetconfig.cYAML_dump(yaml_err) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("Failed to configure the buffers") + return False, [rc, err] + return True, None + + def api_set_routing(self, enable): + rc, yaml_err = lnetconfig.lustre_lnet_enable_routing(enable, -1) + logging.debug("rc = %d" % rc) + err = lnetconfig.cYAML_dump(yaml_err) + logging.debug(err) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("Failed to set routing") + return False, [rc, err] + return True, None + + def api_check_rtr_buffers(self, tiny=LNET_NRB_TINY, small=LNET_NRB_SMALL, large=LNET_NRB_LARGE): + rc, yaml_show, yaml_err = lnetconfig.lustre_lnet_show_routing(-1, False) + err = lnetconfig.cYAML_dump(yaml_err) + show = lnetconfig.cYAML_dump(yaml_show) + self.cYAML_free(yaml_err) + self.cYAML_free(yaml_show) + if rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR: + if self.exceptions: + raise LUTFError("Couldn't configure router buffers: %d, %d, %d" % (tiny, small, large)) + return False, [rc, err] + pyshow = yaml.load(show, Loader=yaml.FullLoader) + if pyshow['buffers']['tiny'] != tiny or \ + pyshow['buffers']['small'] != small or \ + pyshow['buffers']['large'] != large: + if self.exceptions: + raise LUTFError("rtr buffer values configured do not match %d != %d, %d != %d, %d != %d" % + (pyshow['buffers']['tiny'], tiny, pyshow['buffers']['small'], small, + pyshow['buffers']['large'], large)) + return False, [rc, pyshow] + return True, [rc, pyshow] + + def replace_sep(self, nidstr, old, new): + bracket = 0 + for i in range(0, len(nidstr)): + if nidstr[i] == '[': + bracket += 1 + elif nidstr[i] == ']': + bracket -= 1 + elif nidstr[i] == old and bracket == 0: + tmp = list(nidstr) + tmp[i] = new + nidstr = "".join(tmp) + return nidstr + + def api_verify_peer(self, prim_nid, nids): + rc, yaml_show, yaml_err = lnetconfig.lustre_lnet_show_peer(prim_nid, 4, -1, False) + err = lnetconfig.cYAML_dump(yaml_err) + self.cYAML_free(yaml_err) + try: + show = lnetconfig.cYAML_dump(yaml_show) + self.cYAML_free(yaml_show) + except: + show = '' + if rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR: + if self.exceptions: + raise LUTFError("Couldn't show peer %s" % (prim_nid)) + pyerr = yaml.load(err, Loader=yaml.FullLoader) + return False, [rc, pyerr] + nidlist = [] + if nids: + nids = self.replace_sep(nids, ',', ' ') + nidl = lnetconfig.lutf_parse_nidlist(nids, len(nids), + lnetconfig.LNET_MAX_NIDS_PER_PEER) + for n in nidl: + nidlist.append(lnetconfig.lutf_nid2str(n)) + pyshow = yaml.load(show, Loader=yaml.FullLoader) + if prim_nid: + nidlist.insert(0, prim_nid) + nids_found = [] + for nid in nidlist: + for peerni in pyshow['peer'][0]['peer ni']: + if peerni['nid'] == nid: + nids_found.append(nid) + break; + return True, [rc, nidlist, nids_found] + + def api_config_peer(self, prim_nid=None, nids=None, is_mr=True): + rc, yaml_err = lnetconfig.lustre_lnet_modify_peer(prim_nid, nids, is_mr, lnetconfig.LNETCTL_ADD_CMD, -1) + if rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR: + err = lnetconfig.cYAML_dump(yaml_err) + self.cYAML_free(yaml_err) + if self.exceptions: + raise LUTFError("Couldn't configure peer %s: %s" % (prim_nid+' '+nids, err)) + pyerr = yaml.load(err, Loader=yaml.FullLoader) + return False, [rc, pyerr] + # verify config + if prim_nid: + key = prim_nid + else: + mynids = self.replace_sep(nids, ',', ' ') + nidl = lnetconfig.lutf_parse_nidlist(mynids, len(mynids), + lnetconfig.LNET_MAX_NIDS_PER_PEER) + key = lnetconfig.lutf_nid2str(nidl[0]) + rc, info = self.api_verify_peer(key, nids) + if rc == False: + if self.exceptions: + raise LUTFError("Couldn't verify peer " + key) + return rc, info + # expect that the NIDs found is the same as the ones we're looking for + if info[1] != info[2]: + if self.exceptions: + raise LUTFError("Configured nids are in correct" + key) + return False, info + return True, None + + def api_del_peer(self, prim_nid=None, nids=None, all=True): + rc, yaml_err = lnetconfig.lustre_lnet_modify_peer(prim_nid, nids, False, + lnetconfig.LNETCTL_DEL_CMD, -1) + if rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR: + err = lnetconfig.cYAML_dump(yaml_err) + if self.exceptions: + raise LUTFError("Couldn't del peer %s:%d" % (prim_nid, rc)) + pyerr = yaml.load(err, Loader=yaml.FullLoader) + return False, [rc, pyerr] + self.cYAML_free(yaml_err) + # verify config + try: + rc, info = self.api_verify_peer(prim_nid, nids) + if rc and not all: + # verify that all the peers indicated are gone, excluding the primary nid + nidlist = info[1] + foundlist = info[2] + for n in nidlist[1:]: + if n in foundlist: + if self.exceptions: + raise LUTFError("nid %s wasn't deleted" % (n)) + return False, info + elif rc: + if self.exceptions: + raise LUTFError("Peer %s was not deleted" % (prim_nid)) + return False, [rc, info] + except: + if not all: + if self.exceptions: + raise LUTFError("Peer %s was not deleted properly" % (prim_nid)) + return False, [rc, info] + pass + return True, None + + def api_yaml_cfg(self, yaml_file, count, del_count=0, delete = True): + logging.debug("configuring yaml file %s" % yaml_file) + rc, yaml_err = lnetconfig.lustre_yaml_config(yaml_file) + err = lnetconfig.cYAML_dump(yaml_err) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + logging.debug("config failed with: %d \n%s" % (rc, err)) + if self.exceptions: + raise LUTFError("configuration failed") + return False, [rc, err] + + rc, yaml_show, yaml_err = lnetconfig.lustre_yaml_show(yaml_file) + #rc, yaml_show, yaml_err = lnetconfig.lustre_lnet_show_net(None, 0, -1, False) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + logging.debug(lnetconfig.cYAML_dump(yaml_show)) + err = lnetconfig.cYAML_dump(yaml_err) + return False, [rc, err] + + show = lnetconfig.cYAML_dump(yaml_show) + pyy = yaml.load(show, Loader=yaml.FullLoader) + show_count = 0 + for k, v in pyy.items(): + logging.debug("key = %s, value = %s, len = %d" % (str(k), str(v), len(v))) + show_count += len(v) + #show_count = self.cYAML_count(show) + logging.debug("show count = %d\n%s" % (show_count, show)) + + # verify the show through the count only + self.cYAML_free(yaml_show) + if (show_count != count): + error = "show count doesn't match. %d != %d\n%s" % (show_count, count, show) + logging.debug(error) + if self.exceptions: + raise LUTFError(error) + return False, [rc, show] + + if (delete == True): + logging.debug("deleting yaml file: %s" % yaml_file) + rc, yaml_err = lnetconfig.lustre_yaml_del(yaml_file) + err = lnetconfig.cYAML_dump(yaml_err) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("configuration failed") + return False, [rc, err] + + logging.debug("showing after deleting yaml file: %s" % yaml_file) + rc, yaml_show, yaml_err = lnetconfig.lustre_yaml_show(yaml_file) + err = lnetconfig.cYAML_dump(yaml_err) + self.cYAML_free(yaml_err) + if (rc != lnetconfig.LUSTRE_CFG_RC_NO_ERR): + if self.exceptions: + raise LUTFError("configuration failed") + return False, [rc, err] + + # verify the show through the count only + logging.debug("yaml_show type is %s" % str(type(yaml_show))) + # handle two cases: + # yaml_show is NULL or an empty tree + if yaml_show == None: + return True, None + show = lnetconfig.cYAML_dump(yaml_show) + pyy = yaml.load(show, Loader=yaml.FullLoader) + show_count = 0 + for k, v in pyy.items(): + show_count += len(v) + self.cYAML_free(yaml_show) + if (show_count != del_count): + error = "show count doesn't match. %d != %d\n%s" % (show_count, del_count, show) + logging.debug(error) + if self.exceptions: + raise LUTFError(error) + return False, [rc, show] + + return True, None + + def import_config(self, data): + L = TheLNet() + L.import_config(data) + + def import_del(self, data): + L = TheLNet() + L.import_del(data) + + def discover(self, nid): + L = TheLNet() + rc = L.discover(nid) + nids = [] + if 'manage' in list(rc.keys()): + return [] + for entry in rc['discover']: + for nid in entry['peer ni']: + nids.append(nid['nid']) + return nids + + def ping(self, nid): + L = TheLNet() + rc = L.ping(nid) + nids = [] + if 'manage' in list(rc.keys()): + return [] + for entry in rc['ping']: + for nid in entry['peer ni']: + nids.append(nid['nid']) + return nids + + def get_nets(self, net=None, wrapper=False, detail=False): + L = TheLNet() + if not wrapper: + if detail: + return L.get_net_detail(net) + return L.get_net(net) + else: + L.update() + return L.nets[net].get() + + def get_net_stats(self, net=None): + L = TheLNet() + return L.get_net_stats(net) + + def get_peers(self, nid=None, detailed=False): + L = TheLNet() + if not detailed: + return L.get_peers(nid=nid) + return L.get_peers_detail(nid=nid) + + def get_peer_stats(self, nid=None): + L = TheLNet() + return L.get_peer_stats(nid=nid) + + def get_stats(self): + L = TheLNet() + return L.get_stats() + + def set_discovery(self, value): + L = TheLNet() + L.set_global_param('discovery', value) + + def set_max_intf(self, value): + L = TheLNet() + L.set_global_param('max_interfaces', value) + + def set_numa_range(self, value): + L = TheLNet() + L.set_global_param('numa_range', value) + + def set_drop_asym_route(self, value): + L = TheLNet() + L.set_global_param('drop_asym_route', value) + + def set_retry_count(self, value): + L = TheLNet() + L.set_global_param('retry_count', value) + + def set_transaction_timeout(self, value): + L = TheLNet() + L.set_global_param('transaction_timeout', value) + + def set_health_sensitivity(self, value): + L = TheLNet() + L.set_global_param('health_sensitivity', value) + + def set_recovery_interval(self, value): + L = TheLNet() + L.set_global_param('recovery_interval', value) + + def set_router_sensitivity(self, value): + L = TheLNet() + L.set_global_param('router_sensitivity', value) + + def get_globals(self): + L = TheLNet() + return L.get_global() + + def get_config(self): + L = TheLNet() + return L.get_config() + + def configure_yaml(self, yml): + L = TheLNet() + L.configure_yaml(yml) + + def get_cpu_partition_distance(self): + p = {} + cptd = os.path.join(os.sep, 'sys', 'kernel', 'debug', 'lnet', 'cpu_partition_distance') + if not os.path.isfile(cptd): + return p + rc = lutf_exec_local_cmd(CAT + " " + cptd) + p = yaml.load(rc[0].decode('utf-8').replace('\t', ' '), + Loader=yaml.FullLoader) + for k, v in p.items(): + l = v.split() + d = {} + for e in l: + entry = e.split(':') + d[int(entry[0])] = int(entry[1]) + p[k] = d + return p + + def check_udsp_present(self): + rc = lutf_exec_local_cmd(MAN + " lnetctl") + ret_str = str(rc) + if "UDSP" in ret_str: + return True + return False + + def cleanup_udsp(self, num_rules=1): + for ii in range (0, num_rules): + rc = lutf_exec_local_cmd(get_lnetctl() + " udsp del --idx 0") + return rc + + def check_udsp_empty(self): + rc = lutf_exec_local_cmd(get_lnetctl() + " udsp show") + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + print(y) + if y != None: + #print("UDSP list not empty") + error = "UDSP list not empty" + logging.debug(error) + return False + else: + return True + + def check_udsp_expected(self, udsp_conf_expected_dict): + rc = lutf_exec_local_cmd(get_lnetctl() + " udsp show") + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + print(y) + #print ("Out") + #print(rc[0]) + if y == udsp_conf_expected_dict: + return True + else: + error = "%s doesn't match expected: %s" % (str(y), str(udsp_conf_expected_dict)) + #print("%s doesn't match expected: %s ", str(y), str(udsp_conf_expected_dict)) + logging.debug(error) + return False + + def exec_udsp_cmd(self, udsp_cmd_string): + rc = lutf_exec_local_cmd(get_lnetctl() + " udsp " + udsp_cmd_string) + return rc + + def exec_ping(self, dest_nid): + rc = lutf_exec_local_cmd(get_lnetctl() + " ping " + dest_nid) + ret_str = str(rc) + if "primary nid" in ret_str: + return True + return False + + def exec_route_cmd(self, route_cmd): + rc = lutf_exec_local_cmd(get_lnetctl() + " route " + route_cmd) + return rc + + def exec_discover_cmd(self, nid): + rc = lutf_exec_local_cmd(get_lnetctl() + " discover " + nid) + y = yaml.load(rc[0].decode('utf-8'), Loader=yaml.FullLoader) + nids = [] + if 'manage' in list(y.keys()): + return [] + for entry in y['discover']: + for nid in entry['peer ni']: + nids.append(nid['nid']) + return nids diff --git a/lustre/tests/lutf/python/tests-infra/lnet_selftest.py b/lustre/tests/lutf/python/tests-infra/lnet_selftest.py new file mode 100644 index 0000000..53f99f8 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lnet_selftest.py @@ -0,0 +1,57 @@ +import os, yaml, random, pathlib, re +from lutf_basetest import BaseTest, lutfrc +from lutf_exception import LUTFError, LutfDumper +import selftest_template as st +from lutf_cmd import lutf_exec_local_cmd +from lutf import lutf_tmp_dir +from utility_paths import RMMOD, MODPROBE + +class LNetSelfTest(BaseTest): + def __init__(self, script=os.path.abspath(__file__), target=None, exception=True): + super().__init__(script, target) + self.__exception = exception + + def load(self): + lutf_exec_local_cmd(MODPROBE + ' lnet_selftest') + + def unload(self): + lutf_exec_local_cmd(RMMOD + ' lnet_selftest') + + def start(self, src, dst, size='1M', time=30, brw='write', concurrency=32, d1=1, d2=1): + fail_res = {'nids': [], 'brwerr': 0, 'pingerr': 0, 'rpcerr': 0, 'drop': 0, 'exp': 0} + fail_results = [] + rname = os.path.join(lutf_tmp_dir, "st"+str(random.getrandbits(32)) + '.report') + script = os.path.join(lutf_tmp_dir, "st"+str(random.getrandbits(32)) + '.sh') + with open(script, 'w') as f: + numlines = f.write(''.join(st.selftest_script) % (size, time, brw, src, dst, concurrency, d1, d2, rname, '%5', rname)) + lutf_exec_local_cmd('/usr/bin/chmod u+rwx ' + script) + try: + rc = lutf_exec_local_cmd(script, expire=time+3) + except Exception as e: + if type(e) != StopIteration: + raise e + else: + pass + with open(rname, 'r') as f: + input = f.readlines() + res = re.search('(.+?): [Session (.+?) brw errors, (.+?) ping errors] [RPC: (.+?) errors, (.+?) dropped, (.+?) expired]', ''.join(input)) + if res: + fail_res['nids'].append(res[1]) + fail_res['brwerr'] = int(res[2]) + fail_res['pingerr'] = int(res[3]) + fail_res['rpcerr'] = int(res[4]) + fail_res['drop'] = int(res[5]) + fail_res['exp'] = int(res[6]) + fail_results.append(fail_res) + os.remove(rname) + os.remove(script) + if len(fail_results) > 0: + if self.__exception: + str_errors = yaml.dump(fail_results, Dumper=LutfDumper, indent=2, sort_keys=True) + raise LUTFError("Errors in selftest: %s" % str_errors) + return False, fail_results + return True, None + + def stop(self): + self.unload() + return diff --git a/lustre/tests/lutf/python/tests-infra/lustre_cleanup.py b/lustre/tests/lutf/python/tests-infra/lustre_cleanup.py new file mode 100644 index 0000000..fb03398 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lustre_cleanup.py @@ -0,0 +1,111 @@ +""" +1. Look through all the agents and determine the role each one has. IE: OSS, MDS +2. Umount clients +3. Umount OSS +4. Umount MDS +""" + +import os +import yaml, logging +import lutf_agent +from lutf_basetest import BaseTest +from lutf_cmd import lutf_exec_local_cmd +from lustre_roles import * +from utility_paths import MOUNT, UMOUNT + +class LustreCleanup(BaseTest): + def __init__(self, target=None): + logging.critical("INSTANTIATING LustreCleanup for target " + str(target)) + super().__init__(os.path.abspath(__file__), + target=target) + + def __get_lustre_mount(self): + mounts = [] + rc = lutf_exec_local_cmd(MOUNT) + if not rc: + return mounts + out = rc[0].decode('utf-8') + if len(out) == 0: + return mounts + tmp = out.split("\n") + for e in tmp: + if 'type lustre' in e: + mounts.append(e.strip()) + return mounts + + def get_role(self): + node_info = {} + mounts = self.__get_lustre_mount() + + if len(mounts) == 0: + return LUSTRE_NODE_ROLE_UNDEFINED + for m in mounts: + if 'svname' in m: + # this is a server of some form so let's try and + # figure out what it is + if os.path.isfile('/sys/fs/lustre/mgs/MGS/uuid'): + if LUSTRE_NODE_ROLE_MGS in node_info: + node_info[LUSTRE_NODE_ROLE_MGS].append(m.split()[2]) + else: + node_info[LUSTRE_NODE_ROLE_MGS] = [m.split()[2]] + elif os.path.isfile('/sys/fs/lustre/mds/MDS/uuid'): + if LUSTRE_NODE_ROLE_MDS in node_info: + node_info[LUSTRE_NODE_ROLE_MDS].append(m.split()[2]) + else: + node_info[LUSTRE_NODE_ROLE_MDS] = [m.split()[2]] + elif os.path.isfile('/sys/fs/lustre/ost/OSS/uuid'): + if LUSTRE_NODE_ROLE_OSS in node_info: + node_info[LUSTRE_NODE_ROLE_OSS].append(m.split()[2]) + else: + node_info[LUSTRE_NODE_ROLE_OSS] = [m.split()[2]] + elif os.path.isdir('/sys/fs/lustre/mdt/'): + for subdir, dirs, files in os.walk('/sys/fs/lustre/mdt/'): + for f in files: + if f == 'uuid': + if LUSTRE_NODE_ROLE_MDT in node_info: + node_info[LUSTRE_NODE_ROLE_MDT].append(m.split()[2]) + else: + node_info[LUSTRE_NODE_ROLE_MDT] = [m.split()[2]] + else: + if LUSTRE_NODE_ROLE_CLIENT in node_info: + node_info[LUSTRE_NODE_ROLE_CLIENT].append(m.split()[2]) + else: + node_info[LUSTRE_NODE_ROLE_CLIENT] = [m.split()[2]] + + return node_info + + def umount(self, point): + lutf_exec_local_cmd(UMOUNT+" "+point) + +def clean_lustre(): + agents = lutf_agent.LutfAgents() + agent_list = agents.keys() + cleanups = [] + logging.critical("Cleaning up lustre for " + str(agent_list)) + for a in agent_list: + cleanups.append({'obj': LustreCleanup(target=a), 'role': ''}) + + for v in cleanups: + v['role'] = v['obj'].get_role() + + for v in cleanups: + if LUSTRE_NODE_ROLE_CLIENT in v['role']: + for mp in v['role'][LUSTRE_NODE_ROLE_CLIENT]: + v['obj'].umount(mp) + for v in cleanups: + if LUSTRE_NODE_ROLE_OSS in v['role']: + for mp in v['role'][LUSTRE_NODE_ROLE_OSS]: + v['obj'].umount(mp) + for v in cleanups: + if LUSTRE_NODE_ROLE_MDT in v['role']: + for mp in v['role'][LUSTRE_NODE_ROLE_MDT]: + v['obj'].umount(mp) + for v in cleanups: + if LUSTRE_NODE_ROLE_MDS in v['role']: + for mp in v['role'][LUSTRE_NODE_ROLE_MDS]: + v['obj'].umount(mp) + for v in cleanups: + if LUSTRE_NODE_ROLE_MGS in v['role']: + for mp in v['role'][LUSTRE_NODE_ROLE_MGS]: + v['obj'].umount(mp) + diff --git a/lustre/tests/lutf/python/tests-infra/lustre_fs.py b/lustre/tests/lutf/python/tests-infra/lustre_fs.py new file mode 100644 index 0000000..226988d --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lustre_fs.py @@ -0,0 +1,188 @@ +import os, yaml, pathlib, re, threading, time +from lutf import me +import lutf_agent +from clutf_global import * +from lutf_basetest import BaseTest +from lutf_cmd import lutf_exec_local_cmd +from lustre_node import SimpleLustreNode +from lustre_roles import * +import lnet +from fio import FioTraffic +from lutf_exception import * +from lutf_file import * +from lutf_utils import * +import lutf_common_def as common +import logging + +class SimpleLustreFS(): + def __init__(self, agent_list): + self.__lustre_nodes = {} + self.__mgs_nids = [] + self.__client_nodes = [] + self.__mgs_nodes = [] + self.__oss_nodes = [] + for agent in agent_list: + self.__lustre_nodes[agent] = SimpleLustreNode(target=agent) + for key, node in self.__lustre_nodes.items(): + if node.get_lustre_role() == LUSTRE_NODE_ROLE_MGS: + self.__mgs_nodes.append(node) + for key, node in self.__lustre_nodes.items(): + if node.get_lustre_role() == LUSTRE_NODE_ROLE_OST: + self.__oss_nodes.append(node) + for key, node in self.__lustre_nodes.items(): + if node.get_lustre_role() == LUSTRE_NODE_ROLE_CLIENT: + self.__client_nodes.append(node) + if len(self.__mgs_nodes) == 0 or \ + len(self.__oss_nodes) == 0 or \ + len(self.__client_nodes) == 0: + raise LUTFError("Bad cluster clients = %d, OSS = %d, MGS = %d" % + (len(client), len(oss), len(mgs))) + + def lustre_node_key_by_value(self, value): + for key, node in self.__lustre_nodes.items(): + if value == node: + return key + return None + + def list_nids(self, nodes): + nids = [] + for n in nodes: + node_nids = n.list_nids() + if len(node_nids) == 0: + logging.debug("node %s wasn't configured" % n.get_node_name()) + continue + nids += node_nids + return nids + + def get_mgs_nodes(self): + return self.__mgs_nodes + + def get_oss_nodes(self): + return self.__oss_nodes + + def get_client_nodes(self): + return self.__client_nodes + + def get_mgs_nids(self): + return self.__mgs_nids + + def mount_osses(self, num_oss=100): + if num_oss < 1: + raise LUTFError("bad parameters") + + for i in range(0, len(self.__oss_nodes)): + if i >= num_oss: + break + self.__oss_nodes[i].configure_lustre(mgs_nids=self.__mgs_nids, index=i) + + def configure_nets(self, mgs={}, oss={}, client={}): + for n in self.__mgs_nodes: + if len(mgs) == 0: + net_map = {'tcp': [n.list_intfs()[0]]} + else: + net_map = mgs + n.configure_net(net_map) + n.commit() + self.__mgs_nids = self.list_nids(self.__mgs_nodes) + for n in self.__oss_nodes: + if len(mgs) == 0: + net_map = {'tcp': [n.list_intfs()[0]]} + else: + net_map = oss + n.configure_net(net_map) + n.commit() + for n in self.__client_nodes: + if len(mgs) == 0: + net_map = {'tcp': [n.list_intfs()[0]]} + else: + net_map = client + n.configure_net(net_map) + n.commit() + + def mount_mgses(self, num_mgs=100): + if num_mgs < 1: + raise LUTFError("bad parameters") + + for i in range(0, len(self.__mgs_nodes)): + if i >= num_mgs: + break + #self.__mgs_nodes[i].configure_lustre(mgs_nids=self.__mgs_nids, index=i) + self.__mgs_nodes[i].configure_lustre(index=i) + + def mount_servers(self, num_mgs=100, num_oss=100): + if num_mgs < 1 or num_oss < 1: + raise LUTFError("bad parameters") + self.mount_mgses(num_mgs) + self.mount_osses(num_oss) + + def mount_clients(self, num=100): + for i in range(0, len(self.__client_nodes)): + if i >= num: + break + self.__client_nodes[i].configure_lustre(mgs_nids=self.__mgs_nids) + + def unmount_clients(self): + for c in self.__client_nodes: + c.unconfigure_lustre() + + def umount_mgses(self): + for m in self.__mgs_nodes: + m.unconfigure_lustre() + + def umount_osses(self): + for o in self.__oss_nodes: + o.unconfigure_lustre() + + def umount_servers(self): + self.umount_osses() + self.umount_mgses() + + def read_verify_work(self, index, data, path): + cnode = self.lustre_node_key_by_value(self.__client_nodes[index]) + d = LutfDir(os.path.split(path)[0], target=cnode) + files = d.listdir() + if not os.path.split(path)[1] in files: + raise LUTFError("file is not in directory") + f = LutfFile(path, full_path=True, target=cnode) + f.open('rb') + verify = f.read() + if data != verify: + raise LUTFError("Failed to write contents of file") + f.remove() + return True + + def write_work(self, index, data): + mount = self.__client_nodes[index].get_mount_point() + path = os.path.join(mount, "lustrefs"+str(random.getrandbits(32)) + '.data') + cnode = self.lustre_node_key_by_value(self.__client_nodes[index]) + f = LutfFile(path, full_path=True, target=cnode) + f.open('wb') + f.write(data) + f.close() + return path + + def write(self, index, data, thread=False, timeout=30): + if not thread: + return self.write_verify_work(index, data) + cmd_thrd = LutfThread('lustrefs_write', self.write_work, + self, index, data) + cmd_thrd.start() + time.sleep(timeout) + if cmd_thrd.isAlive(): + cmd_thrd.raise_exception() + if not cmd_thrd.rc: + raise StopIteration("File System Write Expired") + return cmd_thrd.rc + + def read_verify(self, index, data, path, thread=False, timeout=30): + if not thread: + return self.read_verify_work(index, data, path) + cmd_thrd = LutfThread('lustrefs_read_verify', self.read_verify_work, + self, index, data, path) + cmd_thrd.start() + time.sleep(timeout) + if cmd_thrd.isAlive(): + cmd_thrd.raise_exception() + if not cmd_thrd.rc: + raise StopIteration("File System Read Expired") + return cmd_thrd.rc diff --git a/lustre/tests/lutf/python/tests-infra/lustre_logs.py b/lustre/tests/lutf/python/tests-infra/lustre_logs.py new file mode 100644 index 0000000..995daf0 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lustre_logs.py @@ -0,0 +1,141 @@ +import os, shlex, yaml, subprocess, random +from lutf_cmd import lutf_exec_local_cmd +from lutf_exception import LUTFError +from lutf_basetest import BaseTest +from utility_paths import LCTL + +LNET_TRACE_MSG_SEND = "(.+?):(.+?):(.+?):(.+?):(.+?):(.+?):(.+?):\((.+?):(.+?):(.+?)\) TRACE: (.+?)\((.+?):(.+?)\) -> (.+?)\((.+?):(.+?)\) (.+?) : (.+?) try# (.+?)" +LNET_TRACE_MSG_RECV = "(.+?):(.+?):(.+?):(.+?):(.+?):(.+?):(.+?):\((.+?):(.+?):(.+?)\) TRACE: (.+?)\((.+?)\) <- (.+?) : (.+?) - (.+?)" + +class LustreLog(BaseTest): + ''' + This class provides methods to easily access the lustre kernel log + If the log buffer overflows the start mark could be lost + Its is suggested to keep logging running for the shortest + period possible to avoid log over flows. + ''' + def __init__(self, script=os.path.abspath(__file__), + target=None): + super().__init__(script, target=target) + self.__started = False + self.__mark_unique = str(random.getrandbits(32)) + self.__start_mark = '' + self.__end_mark = '' + self.__daemon_logfile = '' + self.level = self.get_level() + + def __set(self): + lutf_exec_local_cmd(LCTL + ' set_param debug='+'"'+' '.join(self.level)+'"') + + def get_level(self): + levels = lutf_exec_local_cmd(LCTL + ' get_param debug') + levels = levels[0].decode('utf-8').replace('debug=', '') + levels = levels.split() + return levels + + def set_level(self, level): + ''' + set lustre kernel log level + ''' + self.level = [] + self.level.append(level) + self.__set() + self.level = self.get_level() + + def add_level(self, level): + ''' + adds to the existing lustre log level + ''' + if not level in self.level: + self.level.append(level) + self.__set() + self.level = self.get_level() + + def sub_level(self, level): + ''' + removes a log level + ''' + if not level in self.level: + return + self.level.remove(level) + self.set_level(self.level) + self.level = self.get_level() + + def start(self): + ''' + Start logging and mark the log with a unique marker. + ''' + if self.__started: + LUTFError("Logging already started") + self.__start_mark = 'LUTF-START-'+self.__mark_unique + lutf_exec_local_cmd(LCTL + ' mark '+self.__start_mark) + self.__started = True + + def stop(self, logfile=None): + ''' + Stops logging and marks the end of the log with a unique marker. + ''' + if not self.__started: + LUTFError("Logging did not start") + if len(self.__start_mark) == 0: + LUTFError("Logging corrupted") + self.__end_mark = 'LUTF-END-'+self.__mark_unique + lutf_exec_local_cmd(LCTL + ' mark '+self.__end_mark) + self.__started = False + if logfile: + lutf_exec_local_cmd(LCTL + ' dk '+logfile) + return None + + def get_log(self): + rc = lutf_exec_local_cmd(LCTL + ' dk') + output = rc[0].decode('utf-8') + # parse into a list + lines = output.split('\n') + parsed = [] + start = False + for l in lines: + if self.__start_mark in l: + start = True + if self.__end_mark in l: + start = False + if start: + parsed.append(l) + return parsed + + def extract(self, search, parsed): + extracted = [] + for l in parsed: + if search in l: + extracted.append(l) + return extracted + + def start_daemon(self, logfile, size): + ''' + Start the debug daemon and mark the beginning of the logs with a + unique marker + ''' + if self.__started: + LUTFError("Logging already started") + lutf_exec_local_cmd(LCTL + ' debug_daemon start '+logfile+' '+str(size)) + self.__daemon_logfile = logfile + self.__mark = 'LUTF-'+str(random.getrandbits(32)) + lutf_exec_local_cmd(LCTL + ' mark '+self.__mark) + self.__started = True + + def stop_daemon(self): + ''' + Stop the debug daemon and mark the beginning of the logs with a + unique marker + ''' + if not self.__started: + LUTFError("Logging did not start") + if len(self.__mark) == 0: + LUTFError("Logging corrupted") + if len(self.__daemon_logfile) == 0: + LUTFError("Daemon log file is not set") + lutf_exec_local_cmd(LCTL + ' mark '+self.__mark) + lutf_exec_local_cmd(LCTL + ' debug_daemon stop') + lutf_exec_local_cmd(LCTL + ' debug_file '+self.__daemon_logfile+' '+self.__daemon_logfile+'.log') + self.__mark = '' + self.__started = False + diff --git a/lustre/tests/lutf/python/tests-infra/lustre_node.py b/lustre/tests/lutf/python/tests-infra/lustre_node.py new file mode 100644 index 0000000..c3bcf08 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lustre_node.py @@ -0,0 +1,269 @@ +import os, yaml, pathlib, re +from lutf import me +import lutf_agent +from clutf_global import * +from lutf_basetest import BaseTest +from lutf_cmd import lutf_exec_local_cmd +from lustre_roles import * +import lnet +from fio import FioTraffic +from lutf_exception import * +import logging +from utility_paths import get_mkfs, MOUNT, UMOUNT, get_lctl, lustre_rmmod, load_lustre + +class SimpleLustreNode(BaseTest): + def __init__(self, script=os.path.abspath(__file__), + target=None, exceptions=True): + super().__init__(script, target=target) + if target and me.name != target: + return + self.__exceptions = exceptions + # Role of the node is determined based on the name given + # this is derived from the tests config file. Look at + # /usr/lib64/lustre/tests/cfg/lutfcfg.sh + # for an example + self.__lnet = lnet.TheLNet() + self.__fio = None + self.__client_mountp = None + self.__ost_mountp = None + self.__ost_formatted = False + self.__mds_mountp = None + self.__mds_formatted = False + self.__mounted = False + + if self.get_nodenum(['mds(.*?)_HOST', 'mdt(.*?)_HOST', 'mgs(.*?)_HOST']): + self.__role = LUSTRE_NODE_ROLE_MGS + elif self.get_nodenum(['ost(.*?)_HOST', 'oss(.*?)_HOST']): + self.__role = LUSTRE_NODE_ROLE_OST + elif 'client'.upper() in me.name: + self.__role = LUSTRE_NODE_ROLE_CLIENT + self.__fio = FioTraffic() + elif self.get_nodenum(['rtr(.*?)_HOST']): + self.__role = LUSTRE_NODE_ROLE_ROUTER + else: + self.__role = LUSTRE_NODE_ROLE_UNDEFINED + if self.__exceptions: + raise LUTFError("Unknown Lustre node type") + + def get_node_name(self): + return me.my_name() + + def get_node_hostname(self): + return me.my_hostname() + + def get_mount_point(self): + if self.__role == LUSTRE_NODE_ROLE_MGS: + return self.__mds_mountp + elif self.__role == LUSTRE_NODE_ROLE_OST: + return self.__ost_mountp + elif self.__role == LUSTRE_NODE_ROLE_CLIENT: + return self.__client_mountp + return None + + def get_peer_stats(self): + return self.__lnet.get_peer_stats() + + def list_nids(self, match=None): + if match: + return self.__lnet.nets.list_nids(match)[0][match] + else: + # return all the list of nids + nids = [] + nets = self.__lnet.nets.list_nids() + print(nets) + for net in nets: + for k, v in net.items(): + nids += v + return nids + + def list_intfs(self): + return list(me.list_intfs()['interfaces'].keys()) + + def get_lustre_role(self): + return self.__role + + def check_down(self): + self.__lnet.update() + if len(self.__lnet.nets) > 0: + raise LUTFError("Expect a clean setup") + + def configure_net(self, net_map): + ''' + Configure based based on mapping provided + net_map = {'net_name': [devices]} + ''' + intfs = [] + for k, v in net_map.items(): + for dev in v: + intfs.append(self.__lnet.make_net(dev)) + self.__lnet.nets[k] = intfs + if len(intfs) <= 0: + raise LUTFError("bad configuration. No interfaces provided") + + def configure_route(self, rt_map): + ''' + Configure based based on mapping provided + rt_map = {'rnet': [{'gateway': gw, 'hop': hop, 'priority': prio]} + ''' + for k, v in rt_map.items(): + self.__lnet.routes[k] = v + + def configure_peer(self, peer_map): + ''' + Configure based based on mapping provided + peer_map = {'Multi-Rail': mr, 'peer_ni': [nids]} + ''' + for k, v in peer_map.items(): + peer = self.__lnet.make_peer(v['Multi-Rail'], v['peer_ni']) + self.__lnet.peers[k] = peer + + def commit(self): + self.__lnet.configure() + L1 = lnet.TheLNet() + L1.update() + if not self.__lnet.nets == L1.nets: + raise LUTFError("Failed to configure LNet") + + def configure_lustre(self, mgs_nids=None, index=0, force_format=False): + ''' + Configure the mount based on the environment variables set + ''' + if self.__role == LUSTRE_NODE_ROLE_MGS: + self.__configure_mds(force_format, mgs_nids=mgs_nids, index=index) + elif self.__role == LUSTRE_NODE_ROLE_OST: + self.__configure_ost(force_format, mgs_nids=mgs_nids, index=index) + elif self.__role == LUSTRE_NODE_ROLE_CLIENT: + self.__configure_client(mgs_nids) + self.__mounted = True + + def unconfigure_lustre(self): + ''' + Unconfigure Lustre module + ''' + if self.__role == LUSTRE_NODE_ROLE_MGS: + self.__unconfigure_mount(self.__mds_mountp) + elif self.__role == LUSTRE_NODE_ROLE_OST: + self.__unconfigure_mount(self.__ost_mountp) + elif self.__role == LUSTRE_NODE_ROLE_CLIENT: + self.__unconfigure_mount(self.__client_mountp) + self.__mounted = False + + def ismounted(self): + return self.__mounted + + def start_traffic(self, runtime=30, rw='write', blocksize='1M', numjobs=1): + if not self.__fio: + err = "Traffic can only be generated from clients" + if self.__exception: + raise LUTFError(err) + return False, err + + self.__fio.load(self.__client_mountp, runtime=runtime, rw=rw, blocksize=blocksize, numjobs=numjobs) + self.__fio.start() + + def stop_traffic(self): + self.__fio.stop() + + def get_nodenum(self, patterns): + m = None + num = 1 + for e in patterns: + m = re.search(e.upper(), me.name.upper()) + if m: + if len(m[1]) > 0: + num = int(m[1]) + else: + num = 1 + break + if not m: + return 0 + return num + + def __configure_mds(self, force_format, mgs_nids=None, index=0): + # create mount point + load_lustre() + local_path = get_lutf_tmp_dir() + pathlib.Path(local_path).mkdir(parents=True, exist_ok=True) + self.__mds_mountp = os.path.join(local_path, 'mnt', 'mds') + pathlib.Path(self.__mds_mountp).mkdir(parents=True, exist_ok=True) + mds_num = self.get_nodenum(['mds(.*?)_HOST', 'mdt(.*?)_HOST', 'mgs(.*?)_HOST']) + if mds_num <= 0: + raise LUTFError("Couldn't find an mds") + dev = os.environ['MDSDEV'+str(mds_num)] + if force_format or not self.__mds_formatted: + if not mgs_nids: + lutf_exec_local_cmd(get_mkfs() + ' --fsname=lustrewt --index='+str(index)+' --reformat --mgs --mdt '+ dev) + elif type(mgs_nids) == list: + servicenode = ':'.join(mgs_nids) + lutf_exec_local_cmd(get_mkfs() + ' --fsname=lustrewt --index='+str(index)+' --reformat --mgs --mdt --servicenode='+servicenode+' ' + dev) + else: + LUTFError("Unexpected parameter type: 'mgs_nids'") + self.__mds_formatted = True + lutf_exec_local_cmd(MOUNT + ' -t lustre ' + dev + ' ' + self.__mds_mountp) + + def __configure_ost(self, force_format, mgs_nids, index=0): + if not mgs_nids: + raise LUTFError("mgs nids must be provided") + if type(mgs_nids) == list: + nids = ':'.join(mgs_nids) + elif type(mgs_nids) == str: + nids = mgs_nids + else: + raise LUTFError("mgs_nids of unknown type") + + load_lustre() + # create mount point + local_path = get_lutf_tmp_dir() + pathlib.Path(local_path).mkdir(parents=True, exist_ok=True) + self.__ost_mountp = os.path.join(local_path, 'mnt', 'ost') + pathlib.Path(self.__ost_mountp).mkdir(parents=True, exist_ok=True) + ost_num = self.get_nodenum(['oss(.*?)_HOST', 'ost(.*?)_HOST']) + if ost_num <= 0: + raise LUTFError("Couldn't find an ost") + dev = os.environ['OSTDEV'+str(ost_num)] + if force_format or not self.__ost_formatted: + lutf_exec_local_cmd(get_mkfs() + ' --ost --fsname=lustrewt --index=0 --reformat --mgsnode='+nids+' '+ dev) + self.__ost_formatted = True + lutf_exec_local_cmd(MOUNT + ' -t lustre ' + dev + ' ' + self.__ost_mountp) + + def __configure_client(self, mgs_nids): + if not mgs_nids: + raise LUTFError("mgs nids must be provided") + if type(mgs_nids) == list: + nids = ':'.join(mgs_nids) + elif type(mgs_nids) == str: + nids = mgs_nids + else: + raise LUTFError("mgs_nids of unknown type") + + load_lustre() + # create mount point + local_path = get_lutf_tmp_dir() + pathlib.Path(local_path).mkdir(parents=True, exist_ok=True) + self.__client_mountp = os.path.join(local_path, 'mnt', 'lustre') + pathlib.Path(self.__client_mountp).mkdir(parents=True, exist_ok=True) + lutf_exec_local_cmd(MOUNT + ' -t lustre ' + nids + ':/lustrewt ' + self.__client_mountp) + + def __unconfigure_mount(self, mount_point): + if not mount_point: + logging.debug("No mount point provided") + return + # create mount point + try: + logging.debug('umount ' + mount_point) + lutf_exec_local_cmd(UMOUNT + ' ' + mount_point) + except: + logging.debug('Failed to umount ' + mount_point) + pass + try: + lustre_rmmod() + except: + logging.debug('Failed to lustre_rmmod') + pass + + def set_dynamic_nids(self, value): + lutf_exec_local_cmd(get_lctl() + ' set_param mgc.*.dynamic_nids='+str(value)) + + def get_dynamic_nids(self): + rc = lutf_exec_local_cmd(get_lctl() + ' get_param mgc.*.dynamic_nids') + return int(rc[0].decode('utf-8').split('=')[1].strip()) diff --git a/lustre/tests/lutf/python/tests-infra/lustre_roles.py b/lustre/tests/lutf/python/tests-infra/lustre_roles.py new file mode 100644 index 0000000..703d347 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/lustre_roles.py @@ -0,0 +1,11 @@ +LUSTRE_NODE_ROLE_UNDEFINED = 'UNDEFINED' +LUSTRE_NODE_ROLE_CLIENT = 'CLIENT' +LUSTRE_NODE_ROLE_ROUTER = 'ROUTER' +LUSTRE_NODE_ROLE_OSS = 'OSS' +LUSTRE_NODE_ROLE_OST = 'OST' +LUSTRE_NODE_ROLE_MGS = 'MGS' +LUSTRE_NODE_ROLE_MDT = 'MDT' +LUSTRE_NODE_ROLE_MDS = 'MDS' +LUSTRE_NODE_ROLE_MGT = 'MGT' + + diff --git a/lustre/tests/lutf/python/tests-infra/selftest_template.py b/lustre/tests/lutf/python/tests-infra/selftest_template.py new file mode 100644 index 0000000..9496e3d --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/selftest_template.py @@ -0,0 +1,34 @@ +selftest_script = ['#!/bin/bash\n', +'SZ=%s\n' +'TM=%d\n', +'BRW=%s\n', +'CKSUM=simple\n', +'LFROM=%s\n', +'LTO=%s\n', +'for CN in %d; do\n', +' export LST_SESSION=$$\n', +' echo LST_SESSION = ${LST_SESSION}\n', +' lst new_session lst${BRW}\n', +' lst add_group lfrom ${LFROM}\n', +' lst add_group lto ${LTO}\n', +' lst add_batch bulk_${BRW}\n', +' lst add_test --batch bulk_${BRW} --from lfrom --to lto brw ${BRW} --concurrency=${CN} --distribute %d:%d check=${CKSUM} size=${SZ}\n', +' lst run bulk_${BRW}\n', +' lst stat --mbs lfrom lto 2>&1 | tee /dev/null &\n', +' LSTPID=$!\n', +' echo -n "Capturing statistics for ${TM} secs "\n', +' lst stat --mbs lfrom lto 2>&1 | tee %s &\n', +' i=1\n', +' j=$((${TM}/5))\n', +' if [ $((${TM}%s)) -ne 0 ]; then let j++; fi\n', +' while [ $i -le $j ]; do\n', +' echo -n "loop = ${i}"\n' +' sleep 5\n', +' let i++\n', +' done\n', +' echo\n', +' kill ${LSTPID}\n', +' lst show_error lfrom lto >> %s\n', +' lst stop bulk_${BRW}\n', +' lst end_session\n', +'done\n'] diff --git a/lustre/tests/lutf/python/tests-infra/utility_paths.py b/lustre/tests/lutf/python/tests-infra/utility_paths.py new file mode 100644 index 0000000..95db416 --- /dev/null +++ b/lustre/tests/lutf/python/tests-infra/utility_paths.py @@ -0,0 +1,202 @@ +import os, time, logging +from lutf_common_def import get_lustre_base_path +from lutf_cmd import lutf_exec_local_cmd + +base_lustre = '' +LNETCTL = '' +LCTL = '' +LUSTRE_RMMOD = '' +MKFS = '' + +def get_lnetctl(): + return LNETCTL +def get_lctl(): + return LCTL +def get_mkfs(): + return MKFS +def get_lustre_rmmod(): + return LUSTRE_RMMOD + +def set_default_paths(): + global base_lustre + global LNETCTL + global LCTL + global LUSTRE_RMMOD + global MKFS + + paths_set = False + base_lustre = get_lustre_base_path() + + if base_lustre: + LNETCTL = os.path.join(base_lustre, 'lnet', 'utils', 'lnetctl') + LCTL = os.path.join(base_lustre, 'lustre', 'utils', 'lctl') + LUSTRE_RMMOD = os.path.join(base_lustre, 'lustre', 'scripts', 'lustre_rmmod') + MKFS = os.path.join(base_lustre, 'lustre', 'utils', 'mkfs.lustre') + # the assumption is that if we're working from the home directory + # then if one utility is present all of them are present. We don't + # support a hybrid environment, where some lustre utilities are + # from the build directory and others are installed + if os.path.isfile(LNETCTL): + paths_set = True + + if not paths_set: + LNETCTL = os.path.join(os.path.sep, 'usr', 'sbin', 'lnetctl') + LCTL = os.path.join(os.path.sep, 'usr', 'sbin', 'lctl') + LUSTRE_RMMOD = os.path.join(os.path.sep, 'usr', 'sbin', 'lustre_rmmod') + MKFS = os.path.join(os.path.sep, 'usr', 'sbin', 'mkfs.lustre') + + +MODPROBE = os.path.join(os.path.sep, 'usr', 'sbin', 'modprobe') +INSMOD = os.path.join(os.path.sep, 'usr', 'sbin', 'insmod') +LSMOD = os.path.join(os.path.sep, 'usr', 'sbin', 'lsmod') +RMMOD = os.path.join(os.path.sep, 'usr', 'sbin', 'rmmod') +MOUNT = os.path.join(os.path.sep, 'usr', 'bin', 'mount') +UMOUNT = os.path.join(os.path.sep, 'usr', 'bin', 'umount') +CAT = os.path.join(os.path.sep, 'usr', 'bin', 'cat') +MAN = os.path.join(os.path.sep, 'usr', 'bin', 'man') + +set_default_paths() + +lmodules=[ +'libcfs/libcfs/libcfs.ko', +'lnet/klnds/socklnd/ksocklnd.ko', +'lnet/lnet/lnet.ko', +'lnet/selftest/lnet_selftest.ko', +'lustre/obdclass/obdclass.ko', +'lustre/ptlrpc/ptlrpc.ko', +'lustre/fld/fld.ko', +'lustre/fid/fid.ko', +'lustre/obdclass/llog_test.ko', +'lustre/ptlrpc/gss/ptlrpc_gss.ko', +'lustre/obdecho/obdecho.ko', +'lustre/mgc/mgc.ko', +'lustre/red/red.ko', +'lustre/tests/kernel/kinode.ko', +'lustre/ost/ost.ko', +'lustre/mgs/mgs.ko', +'lustre/lfsck/lfsck.ko', +'lustre/quota/lquota.ko', +'lustre/mdt/mdt.ko', +'lustre/mdd/mdd.ko', +'lustre/ofd/ofd.ko', +'lustre/osp/osp.ko', +'lustre/lod/lod.ko', +'lustre/lov/lov.ko', +'lustre/osc/osc.ko', +'lustre/mdc/mdc.ko', +'lustre/lmv/lmv.ko', +'lustre/llite/lustre.ko', +'ldiskfs/ldiskfs.ko', +'lustre/osd-ldiskfs/osd_ldiskfs.ko', +] + +lnetmodules=['libcfs/libcfs/libcfs.ko', +'lnet/lnet/lnet.ko', +'lnet/klnds/socklnd/ksocklnd.ko', +'lnet/selftest/lnet_selftest.ko'] + +def load_lnet(modparams = {}): + global base_lustre + global LNETCTL + global LCTL + global LUSTRE_RMMOD + global MKFS + + set_default_paths() + + logging.critical("utility_paths::load_lnet") + + modules = lutf_exec_local_cmd(LSMOD) + if modules and len(modules) >= 2: + logging.critical(str(modules[0]) + "\nrc = "+ str(modules[1])) + + if not base_lustre or \ + not os.path.isfile(os.path.join(base_lustre, 'lnet', 'lnet', 'lnet.ko')): + lutf_exec_local_cmd(MODPROBE + ' lnet') + # configure lnet. No extra module parameters loaded + lutf_exec_local_cmd(LNETCTL + " lnet configure") + return + + for module in lnetmodules: + m = os.path.basename(module) + if m in list(modparams.keys()): + cmd = INSMOD + ' ' + os.path.join(base_lustre, module) + for k, v in modparams[m].items(): + cmd += ' '+k+ '='+str(v) + else: + cmd = INSMOD + ' ' + os.path.join(base_lustre, module) + rc = lutf_exec_local_cmd(cmd, exception=False) + + # configure lnet. No extra module parameters loaded + rc = lutf_exec_local_cmd(LNETCTL + " net show") + if rc: + logging.critical(str(rc[0].decode('utf-8'))) + lutf_exec_local_cmd(LNETCTL + " lnet configure") + lutf_exec_local_cmd(LNETCTL + " net del --net tcp", exception=False) + rc = lutf_exec_local_cmd(LNETCTL + " net show") + if rc: + logging.critical(str(rc[0].decode('utf-8'))) + +def load_lustre(modparams = {}): + global base_lustre + global LNETCTL + global LCTL + global LUSTRE_RMMOD + global MKFS + + set_default_paths() + + logging.critical("utility_paths::load_lustre") + + if not base_lustre or \ + not os.path.isfile(os.path.join(base_lustre, 'lustre', 'llite', 'lustre.ko')): + lutf_exec_local_cmd(MODPROBE + ' lustre', exception=False) + return + + lutf_exec_local_cmd('modprobe crc_t10dif') + lutf_exec_local_cmd('insmod /lib/modules/3.10.0-1062.9.1.el7.x86_64/kernel/fs/jbd2/jbd2.ko.xz', exception=False) + lutf_exec_local_cmd('insmod /lib/modules/3.10.0-1062.9.1.el7.x86_64/kernel/fs/mbcache.ko.xz', exception=False) + for module in lmodules: + m = os.path.basename(module) + if m in list(modparams.keys()): + cmd = INSMOD + ' ' + os.path.join(base_lustre, module) + for k, v in modparams[m].items(): + cmd += ' '+k+ '='+str(v) + else: + cmd = INSMOD + ' ' + os.path.join(base_lustre, module) + lutf_exec_local_cmd(cmd, exception=False) + +def lustre_rmmod(): + global base_lustre + global LNETCTL + global LCTL + global LUSTRE_RMMOD + global MKFS + + logging.critical("utility_paths::lustre_rmmod()") + set_default_paths() + + logging.critical("lustre_rmmod::" + LUSTRE_RMMOD) + lutf_exec_local_cmd(LUSTRE_RMMOD, exception=False) + logging.critical("lustre_rmmod::" + RMMOD + " lnet_selftest") + lutf_exec_local_cmd(RMMOD + " lnet_selftest", exception=False) + logging.critical("lustre_rmmod::" + LNETCTL + " lnet unconfigure") + lutf_exec_local_cmd(LNETCTL + " lnet unconfigure", exception=False) + try: + rc = lutf_exec_local_cmd(LUSTRE_RMMOD) + except: + rc = [None, -1] + pass + i = 0 + while rc[1] != 0 and i < 5: + time.sleep(1) + try: + rc = lutf_exec_local_cmd(LUSTRE_RMMOD) + except: + rc = [None, -1] + pass + i += 1 + if rc[1] != 0: + return False + return True; + diff --git a/lustre/tests/lutf/python/tests/sample.py b/lustre/tests/lutf/python/tests/sample.py new file mode 100644 index 0000000..6b67b6b --- /dev/null +++ b/lustre/tests/lutf/python/tests/sample.py @@ -0,0 +1,19 @@ +""" +@PRIMARY: Primary Requirement ID +@PRIMARY_DESC: Textual description of the primary requirement +@SECONDARY: Secondary Requirement IDs if applicable +@DESIGN: Design details +@TESTCASE: Test case description +""" + +from lutf_basetest import BaseTest, lutfrc +from lutf_exception import LUTFError + +class SampleTestClass(BaseTest): + def __init__(self, target=None): + super().__init__(os.path.abspath(__file__), + target=target) + +def run(): + raise LUTFError("Replace with your code") + return lutfrc(0) diff --git a/lustre/tests/lutf/swig_templates/generate_lnetconfig_swig_i.py b/lustre/tests/lutf/swig_templates/generate_lnetconfig_swig_i.py old mode 100755 new mode 100644 diff --git a/lustre/tests/lutf/swig_templates/generate_lutf_swig_i.py b/lustre/tests/lutf/swig_templates/generate_lutf_swig_i.py old mode 100755 new mode 100644 -- 1.8.3.1