Whamcloud - gitweb
LU-10973 lnet: initial LUTF C infrastructure 86/38086/41
authorAmir Shehata <ashehata@whamcloud.com>
Wed, 25 Mar 2020 02:07:58 +0000 (19:07 -0700)
committerOleg Drokin <green@whamcloud.com>
Sat, 13 Mar 2021 18:33:54 +0000 (18:33 +0000)
LNet Unit test Framework is a utility that functionally tests LNet
via python scripts. It operates in a master/slave configuration.
Slaves run on multiple test nodes, while the master is responsible
for managing the slaves to perform specific tests.

The LUTF exercises the different LNet features via configuring
LNet through the lnetconfig interface or lnetctl, running traffic
and monitoring statistics and other logging to ensure that tests
have passed.

Test-Parameters: trivial
Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: Iefcc4d48d5f144a2abe1fdc0865331e9a9d27318
Reviewed-on: https://review.whamcloud.com/38086
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Serguei Smirnov <ssmirnov@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
28 files changed:
config/lustre-build.m4
debian/control
debian/control.main
lustre.spec.in
lustre/autoconf/lustre-core.m4
lustre/tests/Makefile.am
lustre/tests/lutf/Makefile.am [new file with mode: 0644]
lustre/tests/lutf/src/Makefile.am [new file with mode: 0644]
lustre/tests/lutf/src/liblutf_agent.c [new file with mode: 0644]
lustre/tests/lutf/src/liblutf_connect.c [new file with mode: 0644]
lustre/tests/lutf/src/liblutf_global.c [new file with mode: 0644]
lustre/tests/lutf/src/lutf.c [new file with mode: 0644]
lustre/tests/lutf/src/lutf.h [new file with mode: 0644]
lustre/tests/lutf/src/lutf_agent.h [new file with mode: 0644]
lustre/tests/lutf/src/lutf_agent.swg [new file with mode: 0644]
lustre/tests/lutf/src/lutf_common.h [new file with mode: 0644]
lustre/tests/lutf/src/lutf_global.h [new file with mode: 0644]
lustre/tests/lutf/src/lutf_global.swg [new file with mode: 0644]
lustre/tests/lutf/src/lutf_listener.c [new file with mode: 0644]
lustre/tests/lutf/src/lutf_listener.h [new file with mode: 0644]
lustre/tests/lutf/src/lutf_message.h [new file with mode: 0644]
lustre/tests/lutf/src/lutf_python.c [new file with mode: 0644]
lustre/tests/lutf/src/lutf_python.h [new file with mode: 0644]
lustre/tests/lutf/swig_templates/generate_lnetconfig_swig_i.py [new file with mode: 0755]
lustre/tests/lutf/swig_templates/generate_lutf_swig_i.py [new file with mode: 0755]
lustre/tests/lutf/swig_templates/liblnetconfig.template [new file with mode: 0644]
lustre/tests/lutf/swig_templates/lutf_missing_definitions.h [new file with mode: 0755]
lustre/tests/lutf/swig_templates/typemap.template [new file with mode: 0755]

index e963320..f2007de 100644 (file)
@@ -344,8 +344,8 @@ AS_IF([test $target_cpu = powerpc64], [
        CC="$CC -m64"
 ])
 
-# libcfs/include for util headers, lnetconfig headers, lustre/include for liblustreapi and friends
-CPPFLAGS="-I$PWD/libcfs/include -I$PWD/lnet/utils -I$PWD/lustre/include $CPPFLAGS"
+# UAPI headers, libcfs/include for util headers, lustre/include for liblustreapi and friends
+CPPFLAGS="-I$PWD/libcfs/include -I$PWD/lnet/utils/ -I$PWD/lnet/include/uapi -I$PWD/lustre/include -I$PWD/lustre/include/uapi $CPPFLAGS"
 
 CCASFLAGS="-Wall -fPIC -D_GNU_SOURCE"
 AC_SUBST(CCASFLAGS)
index 77a22a2..33a3d50 100644 (file)
@@ -4,7 +4,7 @@ Priority: optional
 Maintainer: Brian J. Murrell <brian.murrell@intel.com>
 Uploaders: Brian J. Murrell <brian.murrell@intel.com>
 Standards-Version: 3.8.3
-Build-Depends: module-assistant, libreadline-dev, debhelper (>=9.0.0), dpatch, automake (>=1.7) | automake1.7 | automake1.8 | automake1.9, pkg-config, libtool, libyaml-dev, libselinux-dev, libsnmp-dev, mpi-default-dev, bzip2, quilt, linux-headers-generic | linux-headers | linux-headers-amd64, rsync, libssl-dev
+Build-Depends: module-assistant, libreadline-dev, debhelper (>=9.0.0), dpatch, automake (>=1.7) | automake1.7 | automake1.8 | automake1.9, pkg-config, libtool, libyaml-dev, libselinux-dev, libsnmp-dev, mpi-default-dev, bzip2, quilt, linux-headers-generic | linux-headers | linux-headers-amd64, rsync, libssl-dev, libpython3-dev, swig
 Homepage: https://wiki.whamcloud.com/
 Vcs-Git: git://git.whamcloud.com/fs/lustre-release.git
 
@@ -84,7 +84,7 @@ Package: lustre-tests
 Section: utils
 Architecture: i386 armhf powerpc ppc64el amd64 ia64 arm64
 Priority: optional
-Depends: lustre-iokit (= ${binary:Version}), lustre-dev (= ${binary:Version}), attr, rsync, perl, lsof, mpi-default-bin, selinux-utils
+Depends: lustre-iokit (= ${binary:Version}), lustre-dev (= ${binary:Version}), attr, rsync, perl, lsof, mpi-default-bin, selinux-utils, python3
 Description: Test suite for the Lustre filesystem
  Lustre is a scalable, secure, robust, highly-available cluster file system.
  This release is maintained by Whamcloud and available from
index 77a22a2..33a3d50 100644 (file)
@@ -4,7 +4,7 @@ Priority: optional
 Maintainer: Brian J. Murrell <brian.murrell@intel.com>
 Uploaders: Brian J. Murrell <brian.murrell@intel.com>
 Standards-Version: 3.8.3
-Build-Depends: module-assistant, libreadline-dev, debhelper (>=9.0.0), dpatch, automake (>=1.7) | automake1.7 | automake1.8 | automake1.9, pkg-config, libtool, libyaml-dev, libselinux-dev, libsnmp-dev, mpi-default-dev, bzip2, quilt, linux-headers-generic | linux-headers | linux-headers-amd64, rsync, libssl-dev
+Build-Depends: module-assistant, libreadline-dev, debhelper (>=9.0.0), dpatch, automake (>=1.7) | automake1.7 | automake1.8 | automake1.9, pkg-config, libtool, libyaml-dev, libselinux-dev, libsnmp-dev, mpi-default-dev, bzip2, quilt, linux-headers-generic | linux-headers | linux-headers-amd64, rsync, libssl-dev, libpython3-dev, swig
 Homepage: https://wiki.whamcloud.com/
 Vcs-Git: git://git.whamcloud.com/fs/lustre-release.git
 
@@ -84,7 +84,7 @@ Package: lustre-tests
 Section: utils
 Architecture: i386 armhf powerpc ppc64el amd64 ia64 arm64
 Priority: optional
-Depends: lustre-iokit (= ${binary:Version}), lustre-dev (= ${binary:Version}), attr, rsync, perl, lsof, mpi-default-bin, selinux-utils
+Depends: lustre-iokit (= ${binary:Version}), lustre-dev (= ${binary:Version}), attr, rsync, perl, lsof, mpi-default-bin, selinux-utils, python3
 Description: Test suite for the Lustre filesystem
  Lustre is a scalable, secure, robust, highly-available cluster file system.
  This release is maintained by Whamcloud and available from
index 486c3a4..3ccdae3 100644 (file)
@@ -191,6 +191,10 @@ Requires: %{requires_kmod_name} = %{requires_kmod_version}
 %endif
 Requires: zlib
 Requires: %{requires_yaml_name}
+%if %{with lustre_tests_lutf}
+Requires: python3 >= 3.6.0
+BuildRequires: python3-devel >= 3.6.0, swig
+%endif
 BuildRequires: libtool libyaml-devel zlib-devel binutils-devel
 %if %{_vendor}=="redhat"
 BuildRequires: redhat-rpm-config
@@ -348,6 +352,9 @@ Requires: lustre-devel = %{version}
 Requires: %{requires_kmod_name} = %{requires_kmod_version}
 Requires: %{requires_kmod_tests_name} = %{requires_kmod_version}
 %endif
+%if %{with lustre_tests_lutf}
+Requires: python3 >= 3.6.0
+%endif
 Requires: attr, rsync, perl, lsof, /usr/bin/getconf
 %if %{with mpi}
 %if %{mpi_name} == "mpich"
@@ -606,6 +613,13 @@ echo '%{_sbindir}/wiretest' >>lustre-tests.files
 if [ -n "$MPI_BIN" ]; then
        echo "$MPI_BIN/*" >>lustre-tests.files
 fi
+%if %{with lustre_tests_lutf}
+echo '%{_libdir}/lustre/tests/lutf/*' >>lustre-tests.files
+%endif
+%endif
+
+%if %{with lustre_tests_lutf}
+echo '%{_libdir}/lustre/tests/lutf/*' >>lustre-tests.files
 %endif
 
 %files devel -f lustre-devel.files
@@ -712,6 +726,7 @@ fi
 %defattr(-, root, root)
 %{_bindir}/iokit-config
 %{_bindir}/iokit-gather-stats
+
 %{_bindir}/iokit-libecho
 %{_bindir}/iokit-lstats
 %{_bindir}/iokit-parse-ior
index f61b6a4..b36260f 100644 (file)
@@ -2978,6 +2978,8 @@ lustre/scripts/Makefile
 lustre/scripts/systemd/Makefile
 lustre/tests/Makefile
 lustre/tests/mpi/Makefile
+lustre/tests/lutf/Makefile
+lustre/tests/lutf/src/Makefile
 lustre/tests/kernel/Makefile
 lustre/tests/kernel/autoMakefile
 lustre/utils/Makefile
index 598435f..8353687 100644 (file)
@@ -3,6 +3,7 @@ AM_CFLAGS := -fPIC -D_GNU_SOURCE \
             -D_LARGEFILE64_SOURCE=1 -D_FILE_OFFSET_BITS=64
 
 DIST_SUBDIRS = mpi
+DIST_SUBDIRS += lutf
 
 noinst_DATA = disk2_4-ldiskfs.tar.bz2 disk2_4-zfs.tar.bz2
 noinst_DATA += disk2_7-ldiskfs.tar.bz2 disk2_7-zfs.tar.bz2
@@ -88,6 +89,15 @@ if MPITESTS
 SUBDIRS = mpi
 endif
 
+# Build LUTF only if the packages are available
+if BUILD_LUTF
+if MPITESTS
+SUBDIRS += lutf
+else
+SUBDIRS = lutf
+endif
+endif # BUILD_LUTF
+
 bin_PROGRAMS = mcreate munlink statx
 testdir = $(libdir)/lustre/tests
 test_SCRIPTS = $(noinst_SCRIPTS)
diff --git a/lustre/tests/lutf/Makefile.am b/lustre/tests/lutf/Makefile.am
new file mode 100644 (file)
index 0000000..1485644
--- /dev/null
@@ -0,0 +1,17 @@
+SUBDIRS = src
+DIST_SUBDIRS = src
+
+nobase_noinst_DATA = swig_templates/lutf_missing_definitions.h
+nobase_noinst_DATA += swig_templates/typemap.template
+nobase_noinst_DATA += swig_templates/liblnetconfig.template
+nobase_noinst_DATA += swig_templates/generate_lutf_swig_i.py
+nobase_noinst_DATA += swig_templates/generate_lnetconfig_swig_i.py
+nobase_noinst_DATA += src/lutf_agent.swg
+nobase_noinst_DATA += src/lutf_global.swg
+nobase_noinst_DATA += src/liblutf_connect.c
+nobase_noinst_DATA += src/liblutf_agent.c
+nobase_noinst_DATA += src/liblutf_global.c
+EXTRA_DIST=$(nobase_noinst_DATA)
+
+noinst_testdir = $(libdir)/lustre/tests/lutf
+nobase_noinst_test_DATA = $(nobase_noinst_DATA)
diff --git a/lustre/tests/lutf/src/Makefile.am b/lustre/tests/lutf/src/Makefile.am
new file mode 100644 (file)
index 0000000..9018ee5
--- /dev/null
@@ -0,0 +1,120 @@
+# Administration utilities Makefile
+
+PYTHON_BIN=python$(PYTHON_VERSION)
+SWIG_COMPFLAGS=-g -Wall -fPIC -c
+SWIG_FLAGS=-D__x86_64__
+SWIG_INCLUDES=-I/usr/include -I/usr/include/linux -I/usr/include/c++/4.4.4/tr1 -I/usr/include/c++/4.4.4/cstdbool
+DLC_SWIG_FLAGS=-D__x86_64__  -D__arch_lib__ -D_LARGEFILE64_SOURCE=1
+DLC_SWIG_INCLUDES=-I/usr/include -I/usr/include/asm -I/usr/include/linux -I/usr/lib64/gcc/i686-pc-mingw32/4.4.6/include/
+DLC_SWIG_INCLUDES+=-I$(top_builddir)/lnet/utils/lnetconfig -I$(top_builddir)/lnet/utils -I$(top_builddir)/lnet/include -I$(top_builddir)/libcfs/include  -I$(top_builddir)/lnet/include/uapi/
+DLC_INCLUDES=-I/usr/include -I$(top_builddir)/lnet/utils/lnetconfig -I$(top_builddir)/lnet/utils -I$(top_builddir)/lnet/include -I$(top_builddir)/libcfs/include -I$(top_builddir)/lnet/include/uapi/
+
+LIBCFS= $(top_builddir)/libcfs/libcfs/.libs/libcfs.a
+LIBLNETCONFIG=-L$(top_builddir)/lnet/utils/lnetconfig/.libs/
+LIBLUTF=-L$(top_builddir)/lustre/tests/lutf/src/
+
+LUTF_AGENT_I=$(top_builddir)/lustre/tests/lutf/src/lutf_agent.i
+LUTF_AGENT_SWG=$(top_builddir)/lustre/tests/lutf/src/lutf_agent.swg
+LUTF_GLOBAL_I=$(top_builddir)/lustre/tests/lutf/src/lutf_global.i
+LUTF_GLOBAL_SWG=$(top_builddir)/lustre/tests/lutf/src/lutf_global.swg
+LNETCONFIG_I=$(top_builddir)/lustre/tests/lutf/src/liblnetconfig.i
+GEN_SWIG_TEMPLATES=$(top_builddir)/lustre/tests/lutf/swig_templates
+UPDATE_LUTF_SWIG_INTF=$(GEN_SWIG_TEMPLATES)/generate_lutf_swig_i.py
+GEN_SWIG_INTF_PY=$(GEN_SWIG_TEMPLATES)/generate_lnetconfig_swig_i.py
+LIBLNETCONFIG_WRAP_I=$(top_builddir)/lustre/tests/lutf/src/liblnetconfig_wrap.c
+LIBLNETCONFIG_WRAP_OBJ=$(top_builddir)/lustre/tests/lutf/src/liblnetconfig_wrap.o
+LIBLUTF_AGENT_C=$(top_builddir)/lustre/tests/lutf/src/liblutf_agent.c
+LIBLUTF_AGENT_OBJ=$(top_builddir)/lustre/tests/lutf/src/liblutf_agent.o
+LIBLUTF_CONNECT_C=$(top_builddir)/lustre/tests/lutf/src/liblutf_connect.c
+LIBLUTF_CONNECT_OBJ=$(top_builddir)/lustre/tests/lutf/src/liblutf_connect.o
+LIBLUTF_GLOBAL_C=$(top_builddir)/lustre/tests/lutf/src/liblutf_global.c
+LIBLUTF_GLOBAL_OBJ=$(top_builddir)/lustre/tests/lutf/src/liblutf_global.o
+
+CYAML_SRC=$(top_builddir)/lnet/utils/lnetconfig/cyaml.c
+CYAML_OBJ=$(top_builddir)/lustre/tests/lutf/src/cyaml.o
+
+clean-local:
+       rm -Rf *.so *_wrap.c *.py *.i
+
+noinst_PROGRAMS = lutf
+
+# LUTF sources
+lutf_SOURCES = lutf.c lutf_listener.c lutf_python.c
+lutf_SOURCES += lutf_global.h lutf_agent.h lutf_common.h lutf.h lutf_listener.h
+lutf_SOURCES += lutf_message.h lutf_python.h
+lutf_CPPFLAGS := $(PYTHON_CPPFLAGS)
+lutf_LDADD := $(top_builddir)/lnet/utils/lnetconfig/liblnetconfig.la \
+               $(LIBCFS) $(LIBREADLINE) $(LIBEFENCE) -lyaml -lm -llutf_agent -llutf_connect -llutf_global $(PYTHON_EXTRA_LIBS)
+lutf_LDFLAGS := $(PYTHON_LIBS) -L$(top_builddir)/lustre/tests/lutf/src
+
+# 1. generate the lutf_agent_wrap.c and py files
+# 2. build the lutf_agent plugin
+# 3. generate the DLC plugin
+lutf_DEPENDENCIES := _clutf_agent.so _clutf_global.so dlc_glue
+
+update_swig_intf :
+       echo "updating lutf_agent.i"
+       $(PYTHON_BIN) $(UPDATE_LUTF_SWIG_INTF) $(GEN_SWIG_TEMPLATES) $(LUTF_AGENT_SWG)
+       echo "updating lutf_global.i"
+       $(PYTHON_BIN) $(UPDATE_LUTF_SWIG_INTF) $(GEN_SWIG_TEMPLATES) $(LUTF_GLOBAL_SWG)
+
+lutf_agent_wrap.c : update_swig_intf
+       echo "generating lutf_agent_warp.c"
+       $(SWIG) -python -includeall $(SWIG_FLAGS) $(SWIG_INCLUDES) $(LUTF_AGENT_I)
+       echo "building lutf_agent_wrap.c"
+       $(CC) $(SWIG_COMPFLAGS) $(PYTHON_CPPFLAGS) $@
+
+_clutf_agent.so : liblutf_agent.so liblutf_connect.so update_swig_intf
+       echo "linking _clutf_agent.so"
+       $(CC) -shared lutf_agent_wrap.o -o $@ $(LIBLUTF) -llutf_agent -llutf_connect
+
+liblutf_agent.so : lutf_agent_wrap.c
+       echo "building liblutf_agent.c"
+       $(CC) $(SWIG_COMPFLAGS) $(PYTHON_CPPFLAGS) $(DLC_INCLUDES) $(CYAML_SRC) $(LIBLUTF_AGENT_C)
+       echo "linking liblutf_agent.so"
+       $(CC) -shared $(LIBLUTF_AGENT_OBJ) $(CYAML_OBJ) -o $@
+
+lutf_global_wrap.c : update_swig_intf
+       echo "generating lutf_global_warp.c"
+       $(SWIG) -python -includeall $(SWIG_FLAGS) $(SWIG_INCLUDES) $(LUTF_GLOBAL_I)
+       echo "building lutf_global_wrap.c"
+       $(CC) $(SWIG_COMPFLAGS) $(PYTHON_CPPFLAGS) $(DLC_INCLUDES) $@
+
+_clutf_global.so : liblutf_global.so
+       echo "linking _clutf_global.so"
+       $(CC) -shared lutf_global_wrap.o -o $@ $(LIBLUTF) -llutf_global
+
+liblutf_global.so : lutf_global_wrap.c
+       echo "building liblutf_global.c"
+       $(CC) $(SWIG_COMPFLAGS) $(PYTHON_CPPFLAGS) $(LIBLUTF_GLOBAL_C)
+       echo "linking liblutf_global.so"
+       $(CC) -shared $(LIBLUTF_GLOBAL_OBJ) -o $@
+
+liblutf_connect.so :
+       echo "building liblutf_connect.c"
+       $(CC) $(SWIG_COMPFLAGS) $(PYTHON_CPPFLAGS) $(LIBLUTF_CONNECT_C)
+       echo "linking liblutf_connect.so"
+       $(CC) -shared $(LIBLUTF_CONNECT_OBJ) -o $@
+
+dlc_glue:
+       echo "generating liblnetconfig.i"
+       $(PYTHON_BIN) $(GEN_SWIG_INTF_PY) $(top_builddir)
+       echo "generating liblndconfig_wrap.c"
+       $(SWIG) -python $(DLC_SWIG_FLAGS) $(DLC_SWIG_INCLUDES) $(LNETCONFIG_I)
+       echo "building liblnetconfig_wrap.c"
+       $(CC) $(SWIG_COMPFLAGS) $(PYTHON_CPPFLAGS) $(DLC_INCLUDES) $(LIBLNETCONFIG_WRAP_I) $(CYAML_SRC)
+       echo "linking _lnetconfig.so"
+       $(CC) $(LIBLNETCONFIG) -lyaml -llnetconfig -Wl,--whole-archive $(LIBCFS) $(CYAML_OBJ) -Wl,--no-whole-archive -shared -o _lnetconfig.so $(LIBLNETCONFIG_WRAP_OBJ) $(PYTHON_EXTRA_LIBS) $(PYTHON_LIBS)
+
+install-exec-local:
+       $(mkinstalldirs) $(DESTDIR)$(libdir)
+       mkdir -p $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp .libs/lutf  $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp _lnetconfig.so $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp _clutf_global.so $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp liblutf_global.so $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp liblutf_connect.so $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp _clutf_agent.so $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp liblutf_agent.so $(DESTDIR)$(libdir)/lustre/tests/lutf/
+       cp *.py $(DESTDIR)$(libdir)/lustre/tests/lutf/
+
diff --git a/lustre/tests/lutf/src/liblutf_agent.c b/lustre/tests/lutf/src/liblutf_agent.c
new file mode 100644 (file)
index 0000000..970c6c6
--- /dev/null
@@ -0,0 +1,675 @@
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <assert.h>
+#include "cyaml.h"
+#include "lutf_agent.h"
+#include "lutf.h"
+#include "lutf_python.h"
+
+static pthread_mutex_t agent_array_mutex;
+static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
+/* TODO: this is probably not thread safe */
+static char agent_state_str[128];
+
+extern bool g_agent_enable_hb;
+extern struct in_addr g_local_ip;
+
+#define DEFAULT_RPC_RSP "rpc:\n   src: %s\n   dst: %s\n   type: failure\n"
+
+#define MUTEX_LOCK(x) \
+  pthread_mutex_lock(x)
+
+#define MUTEX_UNLOCK(x) \
+  pthread_mutex_unlock(x)
+
+char *get_local_ip()
+{
+       return inet_ntoa(g_local_ip);
+}
+
+void release_agent_blk(lutf_agent_blk_t *agent)
+{
+       /* release the agent blk mutex */
+       MUTEX_LOCK(&agent->mutex);
+       if (agent) {
+               assert(agent->ref_count != 0);
+               agent->ref_count--;
+       }
+       MUTEX_UNLOCK(&agent->mutex);
+}
+
+void acquire_agent_blk(lutf_agent_blk_t *agent)
+{
+       /* acquire the agent blk mutex */
+       MUTEX_LOCK(&agent->mutex);
+       if (agent)
+               agent->ref_count++;
+       MUTEX_UNLOCK(&agent->mutex);
+}
+
+char *agent_state2str(lutf_agent_blk_t *agent)
+{
+       if (!agent)
+               return "NULL PARAMETER";
+
+       sprintf(agent_state_str, "%s%s%s%s",
+               (agent->state & LUTF_AGENT_STATE_ALIVE) ? "alive " : "dead ",
+               (agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED) ? " HB" : "",
+               (agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED) ? " RPC" : "",
+               (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) ? " WIP" : "");
+
+       return agent_state_str;
+}
+
+static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
+                                               struct sockaddr_in *addr)
+{
+       int i;
+       lutf_agent_blk_t *agent;
+
+       if (!addr)
+               return NULL;
+
+       MUTEX_LOCK(&agent_array_mutex);
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               agent = list[i];
+               if ((agent) &&
+                   (agent->addr.sin_addr.s_addr ==
+                    addr->sin_addr.s_addr)) {
+                       MUTEX_UNLOCK(&agent_array_mutex);
+                       return agent;
+               }
+       }
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       return NULL;
+}
+
+int get_next_active_agent(int idx, lutf_agent_blk_t **out)
+{
+       int i = idx;
+       lutf_agent_blk_t *agent = NULL;
+
+       if (idx >= MAX_NUM_AGENTS)
+               goto out;
+
+       MUTEX_LOCK(&agent_array_mutex);
+       for (i = idx; i < MAX_NUM_AGENTS; i++) {
+               agent = agent_live_list[i];
+               if (agent) {
+                       i++;
+                       acquire_agent_blk(agent);
+                       break;
+               }
+       }
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+out:
+       *out = agent;
+
+       return i;
+}
+
+lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
+{
+       lutf_agent_blk_t *agent;
+       agent = find_agent_blk_by_addr(agent_live_list, addr);
+       if (!agent)
+               return find_free_agent_blk(addr);
+
+       MUTEX_LOCK(&agent_array_mutex);
+       acquire_agent_blk(agent);
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       return agent;
+}
+
+int lutf_agent_get_highest_fd(void)
+{
+       lutf_agent_blk_t *agent;
+       int iMaxFd = INVALID_TCP_SOCKET;
+       int i;
+
+       MUTEX_LOCK(&agent_array_mutex);
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               agent = agent_live_list[i];
+               if (agent) {
+                       if (agent->iFileDesc > iMaxFd)
+                               iMaxFd = agent->iFileDesc;
+                       if (agent->iRpcFd > iMaxFd)
+                               iMaxFd = agent->iRpcFd;
+               }
+       }
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       return iMaxFd;
+}
+
+void agent_disable_hb(void)
+{
+       g_agent_enable_hb = false;
+}
+
+void agent_enable_hb(void)
+{
+       g_agent_enable_hb = true;
+}
+
+lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
+{
+       int i = 0;
+       lutf_agent_blk_t *agent;
+
+       /* grab the lock for the array */
+       MUTEX_LOCK(&agent_array_mutex);
+
+       /* iterate through the array to find a free entry */
+       while (agent_live_list[i] != NULL)
+               i++;
+
+       if (i >= MAX_NUM_AGENTS) {
+               MUTEX_UNLOCK(&agent_array_mutex);
+               return NULL;
+       }
+
+       /* allocate a new agent blk and assign it to that entry */
+       agent = calloc(sizeof(char),
+               sizeof(lutf_agent_blk_t));
+       if (!agent) {
+               MUTEX_UNLOCK(&agent_array_mutex);
+               return NULL;
+       }
+
+       gettimeofday(&agent->time_stamp, NULL);
+       agent->id = i;
+       agent->iFileDesc = INVALID_TCP_SOCKET;
+       agent->iRpcFd = INVALID_TCP_SOCKET;
+       agent->addr = *addr;
+       set_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
+
+       pthread_mutex_init(&agent->mutex, NULL);
+       acquire_agent_blk(agent);
+
+       /* assign to array */
+       agent_live_list[i] = agent;
+
+       /* release the array mutex */
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       /* return the agent blk */
+       return agent;
+}
+
+lutf_agent_blk_t *find_agent_blk_by_id(int idx)
+{
+       lutf_agent_blk_t *agent;
+
+       if ((idx < 0) || (idx >= MAX_NUM_AGENTS))
+               return NULL;
+
+       /* grab the array mutex */
+       MUTEX_LOCK(&agent_array_mutex);
+
+       /* if the blk is non null grab the mutex.
+        * possibly block until previous user is done
+        */
+       if (agent_live_list[idx] == NULL) {
+               MUTEX_UNLOCK(&agent_array_mutex);
+               return NULL;
+       }
+
+       agent = agent_live_list[idx];
+
+       acquire_agent_blk(agent);
+
+       /* release the array mutex */
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       /* return the agent blk */
+       return agent;
+}
+
+void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
+{
+       MUTEX_LOCK(&agent->mutex);
+       agent->state |= state;
+       MUTEX_UNLOCK(&agent->mutex);
+}
+
+void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
+{
+       bool zombie = false;
+
+       MUTEX_LOCK(&agent->mutex);
+       agent->state &= ~state;
+       if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) &&
+           (agent->state & LUTF_AGENT_ZOMBIE))
+               zombie = true;
+       MUTEX_UNLOCK(&agent->mutex);
+
+       if (zombie)
+               free_agent_blk(agent->id);
+}
+
+void free_agent_blk(int id)
+{
+       lutf_agent_blk_t *agent;
+
+       /* grab the array mutex */
+       MUTEX_LOCK(&agent_array_mutex);
+
+       /* if the blk is non null grab the mutex.
+        * possibly block until previous user is done
+        */
+       if (agent_live_list[id] == NULL) {
+               MUTEX_UNLOCK(&agent_array_mutex);
+               return;
+       }
+
+       agent = agent_live_list[id];
+
+       MUTEX_LOCK(&agent->mutex);
+       if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) {
+               MUTEX_UNLOCK(&agent->mutex);
+               MUTEX_UNLOCK(&agent_array_mutex);
+               PDEBUG("delay deleting agent %s\n", agent->name);
+               set_agent_state(agent, LUTF_AGENT_ZOMBIE);
+               return;
+       }
+       MUTEX_UNLOCK(&agent->mutex);
+
+       agent_live_list[id] = NULL;
+
+       /* release the array mutex */
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       /* free the block */
+       free(agent);
+}
+
+char *agent_ip2str(lutf_agent_blk_t *agent)
+{
+       if (!agent)
+               return NULL;
+
+       return inet_ntoa(agent->addr.sin_addr);
+}
+
+int get_num_agents(void)
+{
+       int i;
+       int num = 0;
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               if (agent_live_list[i] != NULL)
+                       num++;
+       }
+
+       return num;
+}
+
+/* no lock version of the function */
+static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
+{
+       lutf_agent_blk_t *agent;
+       int i;
+
+       if (!name)
+               return NULL;
+
+       MUTEX_LOCK(&agent_array_mutex);
+
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               agent = agent_live_list[i];
+               if ((agent) &&
+                   ((strcmp(agent->name, name) == 0) ||
+                    (strcmp(name, TEST_ROLE_GRC) == 0))) {
+                       break;
+               }
+       }
+
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       /* return the agent blk */
+       return agent;
+}
+
+lutf_agent_blk_t *find_agent_blk_by_name(char *name)
+{
+       lutf_agent_blk_t *agent;
+
+       agent = find_agent_blk_by_name_nl(name);
+       if (agent)
+               acquire_agent_blk(agent);
+
+       /* return the agent blk */
+       return agent;
+}
+
+lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
+{
+       lutf_agent_blk_t *agent;
+       int i;
+       struct sockaddr_in addr;
+
+       if (!ip)
+               return NULL;
+
+       inet_aton(ip, &addr.sin_addr);
+
+       /* grab the array mutex */
+       MUTEX_LOCK(&agent_array_mutex);
+
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               agent = agent_live_list[i];
+               if ((agent) && (agent->addr.sin_addr.s_addr ==
+                               addr.sin_addr.s_addr))
+                       break;
+       }
+
+       if (agent)
+               acquire_agent_blk(agent);
+
+       /* release the array mutex */
+       MUTEX_UNLOCK(&agent_array_mutex);
+
+       /* return the agent blk */
+       return agent;
+}
+
+void agent_hb_check(struct timeval *t, lutf_type_t me)
+{
+       lutf_agent_blk_t *agent;
+       int i;
+
+       /* grab the array mutex */
+       MUTEX_LOCK(&agent_array_mutex);
+
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               agent = agent_live_list[i];
+               if (agent && agent->node_type != me) {
+                       acquire_agent_blk(agent);
+                       if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
+                               int agent_id = agent->id;
+                               /* agent didn't send a HB move to dead
+                                * list
+                                */
+                               PERROR("agent %s presumed dead", agent->name);
+                               release_agent_blk(agent);
+                               MUTEX_UNLOCK(&agent_array_mutex);
+                               /* free_agent_blk() grabs the mutex */
+                               free_agent_blk(agent_id);
+                               MUTEX_LOCK(&agent_array_mutex);
+                               continue;
+                       }
+                       release_agent_blk(agent);
+               }
+       }
+
+       /* release the array mutex */
+       MUTEX_UNLOCK(&agent_array_mutex);
+}
+
+lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
+{
+       struct timeval start;
+       struct timeval now;
+       struct cYAML *a;
+       bool found = false;
+       lutf_agent_blk_t *agent;
+
+       gettimeofday(&start, NULL);
+       gettimeofday(&now, NULL);
+
+       if (!agents) {
+               PDEBUG("No agent to wait for");
+               return EN_LUTF_RC_OK;
+       }
+
+       PDEBUG("Start waiting for Agents");
+
+       while (now.tv_sec - start.tv_sec < timeout && !found) {
+               found = true;
+               PDEBUG("Waiting for Agents");
+               while (cYAML_get_next_seq_item(agents, &a) != NULL) {
+                       PDEBUG("Looking up: %s", a->cy_valuestring);
+                       if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) {
+                               found = false;
+                               break;
+                       } else {
+                               PDEBUG("agent %s found\n", agent->name);
+                               release_agent_blk(agent);
+                       }
+               }
+               if (!found)
+                       sleep(1);
+               gettimeofday(&now, NULL);
+       }
+
+       return found ? EN_LUTF_RC_OK : EN_LUTF_RC_TIMEOUT;
+}
+
+int get_num_agents_remote(char *masterIP, int masterPort)
+{
+       lutf_rc_t rc;
+       lutf_msg_num_agents_query_t msg;
+       lutf_msg_num_agents_query_t *msg_p;
+       lutf_message_hdr_t hdr;
+       lutf_message_hdr_t *hdr_p;
+       int remoteSocket = INVALID_TCP_SOCKET;
+       struct in_addr addr;
+       char *recvBuf = calloc(1, sizeof(hdr) + sizeof(hdr));
+
+       if (!recvBuf) {
+               PERROR("out of memory");
+               rc = EN_LUTF_RC_FAIL;
+               goto out;
+       }
+
+       if (!inet_aton(masterIP, &addr)) {
+               PERROR("bad master IP = %s", masterIP);
+               rc = EN_LUTF_RC_FAIL;
+               goto out;
+       }
+
+       /* in network byte order, convert so we can have a
+        * uniform API
+        */
+       remoteSocket = establishTCPConnection(addr.s_addr,
+                                               htons(masterPort),
+                                               false, false);
+       if (remoteSocket < 0) {
+               PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket));
+               rc = remoteSocket;
+               goto out;
+       }
+
+       rc = lutf_send_msg(remoteSocket, NULL, 0, EN_MSG_TYPE_GET_NUM_AGENTS);
+       if (rc)
+               goto out;
+
+       rc = readTcpMessage(remoteSocket, recvBuf, sizeof(hdr) + sizeof(msg),
+                           TCP_READ_TIMEOUT_SEC);
+       if (rc) {
+               PERROR("failed to receive response");
+               goto out;
+       }
+
+       hdr_p = (lutf_message_hdr_t *)recvBuf;
+       msg_p = (lutf_msg_num_agents_query_t *)(recvBuf + sizeof(hdr));
+
+       if (hdr_p->type != EN_MSG_TYPE_GET_NUM_AGENTS) {
+               PERROR("Unexpected message. Waiting for num agents received %d",
+                      hdr_p->type);
+               rc = EN_LUTF_RC_FAIL;
+               goto out;
+       }
+
+       rc = msg_p->num_agents;
+
+out:
+       closeTcpConnection(remoteSocket);
+       free(recvBuf);
+       return rc;
+}
+
+lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
+{
+       lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
+       lutf_agent_blk_t *agent_blk;
+       char *default_rsp;
+       lutf_message_hdr_t hdr;
+       char *recvBuf = NULL;
+       int msg_size;
+
+       if (!agent || !yaml || !rsp)
+               goto fail_rpc;
+
+       msg_size = strlen(yaml) + 1;
+
+       PDEBUG("sending rpc request\n%s", yaml);
+
+       agent_blk = find_agent_blk_by_name(agent);
+       if (!agent_blk) {
+               PERROR("Can't find agent with name: %s", agent);
+               goto fail_rpc_no_agent;
+       }
+
+       MUTEX_LOCK(&agent_blk->mutex);
+       if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
+               MUTEX_UNLOCK(&agent_blk->mutex);
+               PDEBUG("Establishing an RPC channel to agent %s:%s:%d",
+                      agent_blk->name,
+                      inet_ntoa(agent_blk->addr.sin_addr),
+                      agent_blk->listen_port);
+               /* in network byte order, convert so we can have a uniform API */
+               agent_blk->iRpcFd = establishTCPConnection(
+                               agent_blk->addr.sin_addr.s_addr,
+                               htons(agent_blk->listen_port),
+                               false, false);
+               if (agent_blk->iRpcFd < 0)
+                       goto fail_rpc;
+               set_agent_state(agent_blk,
+                               LUTF_AGENT_RPC_CHANNEL_CONNECTED);
+       } else {
+               MUTEX_UNLOCK(&agent_blk->mutex);
+       }
+
+       set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+
+       rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
+                          EN_MSG_TYPE_RPC_REQUEST);
+       if (rc != EN_LUTF_RC_OK) {
+               PERROR("Failed to send rpc message: %s", yaml);
+               goto fail_rpc;
+       }
+
+       /* wait for the response */
+       rc = readTcpMessage(agent_blk->iRpcFd, (char *)&hdr,
+                           sizeof(hdr), timeout);
+       if (rc != EN_LUTF_RC_OK) {
+               PERROR("Failed to recv rpc header in timeout %d",
+                      timeout);
+               goto fail_rpc;
+       }
+
+       if (ntohl(hdr.type) != EN_MSG_TYPE_RPC_RESPONSE ||
+           ntohl(hdr.version) != LUTF_VERSION_NUMBER) {
+               PERROR("Bad response. version %d, type:%d\n",
+                      hdr.type, hdr.version);
+               goto fail_rpc;
+       }
+
+       recvBuf = calloc(ntohl(hdr.len), 1);
+       if (!recvBuf) {
+               PERROR("Failed to allocate buffer to recv rpc response");
+               goto fail_rpc;
+       }
+
+       rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout);
+       if (rc != EN_LUTF_RC_OK) {
+               PERROR("Failed to recv rpc body in timeout %d", timeout);
+               goto fail_rpc;
+       }
+
+       /*
+        * once recvBuf is given back to the caller, it's expected that
+        * the caller will manage the memory and free when done. This is
+        * mainly called from python. The SWIG wrapper frees the memory
+        * appropriately.
+        */
+       *rsp = recvBuf;
+       release_agent_blk(agent_blk);
+
+       unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+
+       return EN_LUTF_RC_OK;
+
+fail_rpc:
+       release_agent_blk(agent_blk);
+       unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+       if (recvBuf)
+               free(recvBuf);
+       msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
+               strlen(g_lutf_cfg.l_info.hb_info.node_name) + 1;
+fail_rpc_no_agent:
+       default_rsp = calloc(msg_size, 1);
+       if (!default_rsp) {
+               PERROR("Failed to allocate buffer for default response");
+               *rsp = NULL;
+       } else {
+               /* the source for the response would be the agent we sent
+                * to and the destination is me
+                */
+               snprintf(default_rsp, msg_size,
+                        DEFAULT_RPC_RSP, agent,
+                        g_lutf_cfg.l_info.hb_info.node_name);
+               *rsp = default_rsp;
+       }
+
+       return rc;
+}
+
+lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
+{
+       lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
+       lutf_agent_blk_t *agent_blk;
+       int msg_size;
+
+       if (!agent || !yaml)
+               goto out;
+
+       msg_size = strlen(yaml) + 1;
+
+       agent_blk = find_agent_blk_by_name_nl(agent);
+       if (!agent_blk) {
+               PERROR("Can't find agent with name: %s", agent);
+               goto out;
+       }
+
+       MUTEX_LOCK(&agent_blk->mutex);
+       if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
+               MUTEX_UNLOCK(&agent_blk->mutex);
+               PERROR("agent_blk %s doesn't have an RPC channel",
+                      agent_blk->name);
+               goto out;
+       }
+       MUTEX_UNLOCK(&agent_blk->mutex);
+
+       PDEBUG("sending rpc response\n%s", yaml);
+       rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
+                          EN_MSG_TYPE_RPC_RESPONSE);
+
+out:
+       return rc;
+}
+
+void agent_init(void)
+{
+       pthread_mutex_init(&agent_array_mutex, NULL);
+}
diff --git a/lustre/tests/lutf/src/liblutf_connect.c b/lustre/tests/lutf/src/liblutf_connect.c
new file mode 100644 (file)
index 0000000..796571a
--- /dev/null
@@ -0,0 +1,331 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#include <errno.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <fcntl.h>
+#include <string.h>
+#include <strings.h>
+#include <sys/time.h>
+#include "lutf.h"
+#include "lutf_message.h"
+
+static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA,
+                                     int iSAlen, int iNsec)
+{
+       int iN, iError = 0;
+       int iLen;
+       fd_set rset, wset;
+       struct timeval tval;
+
+       if ((iN = connect(iSockFd, (struct sockaddr *)psSA, iSAlen)) < 0) {
+               if (errno != EINPROGRESS) {
+                       PERROR("Connect Failed: %s:%d", strerror(errno), errno);
+                       return EN_LUTF_RC_FAIL;
+               }
+       }
+
+       if (iN != 0) {
+               FD_ZERO(&rset);
+               FD_SET(iSockFd, &rset);
+               wset = rset;
+               tval.tv_sec = iNsec;
+               tval.tv_usec = 0;
+
+               if ((iN = select(iSockFd+1, &rset, &wset, NULL,
+                                iNsec ? &tval : NULL)) == 0) {
+                       errno = ETIMEDOUT;
+                       PERROR("Select timed out");
+                       return EN_LUTF_RC_FAIL;
+               }
+
+               if (iN < 0)
+                       return EN_LUTF_RC_FAIL;
+
+               if (FD_ISSET(iSockFd, &rset) || FD_ISSET(iSockFd, &wset)) {
+                       iLen = sizeof(iError);
+                       if (getsockopt(iSockFd, SOL_SOCKET, SO_ERROR, &iError, (socklen_t *)&iLen) < 0) {
+                               PERROR("getsockopt failed indicating connect failure, errno= %d", errno);
+                               return EN_LUTF_RC_FAIL;
+                       }
+               } else {
+                       PERROR("select error: sockfd not set");
+                       return EN_LUTF_RC_FAIL;
+               }
+       }
+
+       /* There was some error when connecting */
+       if (iError) {
+               errno = iError;
+               PERROR("Error on connect. errno = %s", strerror(errno));
+               return EN_LUTF_RC_FAIL;
+       }
+
+       return EN_LUTF_RC_OK;
+}
+
+int establishTCPConnection(unsigned long uiAddress,
+                          int iPort,
+                          bool b_non_block,
+                          bool endian)
+{
+       int iOption = 1, iFlags;
+       int rsocket;
+       struct sockaddr_in tm_addr;
+       lutf_rc_t eRc = EN_LUTF_RC_OK;
+
+       /* Create TCP socket */
+       if ((rsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP))
+            == -1)
+               return EN_LUTF_RC_FAIL;
+
+       /* Turn off Nagle's algorithm for this TCP socket. */
+       setsockopt(rsocket, IPPROTO_TCP, TCP_NODELAY, (void *)&iOption,
+                  sizeof(iOption));
+
+       iFlags = 1;
+       if (setsockopt(rsocket, SOL_SOCKET, SO_REUSEADDR, (void *)&iFlags,
+                      sizeof(iFlags)) < 0) {
+               /*  Cannot change the socket options.  */
+               close(rsocket);
+               return EN_LUTF_RC_FAIL;
+       }
+
+       iFlags = fcntl(rsocket, F_GETFL, 0);
+       if (b_non_block)
+               fcntl(rsocket, F_SETFL, iFlags | O_NONBLOCK);
+       else
+               fcntl(rsocket, F_SETFL, iFlags & (~O_NONBLOCK));
+
+       /* Set address parameters for TCP connection */
+       bzero((char *) &tm_addr, sizeof(tm_addr));
+       tm_addr.sin_addr.s_addr = (endian) ? htonl(uiAddress) : uiAddress;
+       tm_addr.sin_port = (endian) ? htons(iPort) : iPort;
+       tm_addr.sin_family = AF_INET;
+
+       if ((eRc = doNonBlockingConnect(rsocket,
+                                       (struct sockaddr *)&tm_addr,
+                                       sizeof(tm_addr),
+                                       SOCKET_CONN_TIMEOUT_SEC))
+           != EN_LUTF_RC_OK) {
+               close(rsocket);
+               return eRc;
+       }
+
+       return rsocket;
+}
+
+lutf_rc_t closeTcpConnection(int iTcpSocket)
+{
+       int rc;
+
+       PDEBUG("closing socket %d", iTcpSocket);
+       rc = close(iTcpSocket);
+       if (!rc && errno != EINPROGRESS && errno != ECONNRESET) {
+               PERROR("failed to close %d:%d\n", iTcpSocket, errno);
+               return EN_LUTF_RC_FAIL;
+       }
+
+       return EN_LUTF_RC_OK;
+}
+
+/*
+ * sendTcpMessage
+ *   Send a TCP message to the specified TCP socket.
+ *
+ * Parameters:      iTcpSocket - Socket file descriptor
+ *                  pcBody - TCP message to send.
+ *                  iBodySize - size of the body
+ *
+ */
+lutf_rc_t sendTcpMessage(int iTcpSocket, char *pcBody, int iBodySize)
+{
+       size_t tNleft;
+       ssize_t tNwritten;
+       char *pcCur;
+
+       if (iTcpSocket == INVALID_TCP_SOCKET)
+               return(EN_LUTF_RC_FAIL);
+
+       /* Start writing bytes to the socket and keep writing until we have
+        * the requested number of bytes sent.
+        */
+       pcCur = (char *)pcBody;
+       tNleft = iBodySize;
+
+       while (tNleft > 0) {
+               /*  Send as many bytes, up to current maximum, as we can.  */
+               tNwritten = write(iTcpSocket, pcCur, tNleft);
+
+               if (tNwritten < 0) {
+                       if (errno == EINTR) {
+                               /* We were interrupted, but this is not an
+                                * error condition.
+                                */
+                               tNwritten = 0;
+                       } else {
+                               /* System error has occurred.  */
+                               PERROR("Failed to send message (%d, %p, %d, %u)  %s:%d",
+                                      iTcpSocket, pcBody, iBodySize, tNwritten,
+                                      strerror(errno), errno);
+                               return EN_LUTF_RC_SYS_ERR;
+                       }
+               }
+
+               tNleft -= tNwritten;
+               pcCur += tNwritten;
+       }
+
+       return EN_LUTF_RC_OK;
+}
+
+/*
+ * populateMsgHdr
+ *     populate the LUTF message header with the passed in information.
+ *
+ * Parameters:      rsocket - Socket file descriptor
+ *                  msg_hdr - pointer to the message header.
+ *                  msg_type - type of message
+ *                  msg_size - message size
+ *                  lutf_version_number - version number
+ *
+ */
+lutf_rc_t populateMsgHdr(int rsocket, char *msg_hdr,
+                        int msg_type, int msg_size,
+                        int lutf_version_number)
+{
+       lutf_message_hdr_t *hdr = NULL;
+       struct sockaddr_in sock;
+       int len = sizeof(sock);
+       int rc;
+
+       if (rsocket == INVALID_TCP_SOCKET ||
+           msg_hdr == NULL) {
+               PERROR("bad parameter: hdr = %p, socket = %d",
+                      msg_hdr, rsocket);
+               return EN_LUTF_RC_FAIL;
+       }
+
+       hdr = (lutf_message_hdr_t *)msg_hdr;
+
+       /* get the local IP address we are connected on */
+       rc = getsockname(rsocket,
+                       (struct sockaddr *)&sock,
+                       (socklen_t *)&len);
+       if (rc) {
+               PERROR("getsockname failure %s:%s:%d",
+                      strerror(errno), strerror(rc), rc);
+               return EN_LUTF_RC_FAIL;
+       }
+
+       hdr->type = htonl(msg_type);
+       hdr->len = htonl(msg_size);
+       hdr->ip.s_addr = sock.sin_addr.s_addr;
+       hdr->version = htonl(lutf_version_number);
+
+       return EN_LUTF_RC_OK;
+}
+
+lutf_rc_t readTcpMessage(int iFd, char *pcBuffer,
+                        int iBufferSize, int iTimeout)
+{
+       size_t tNleft;
+       ssize_t tNread;
+       char *pcCur;
+       struct timeval sTimeout;
+       int iFlags;
+
+       /* Grab a copy of the client's file descriptor
+        * (and make sure it isn't -1).
+        */
+       if (iFd == -1)
+               return EN_LUTF_RC_CLIENT_CLOSED;
+
+       /* set the timeout */
+       if (iTimeout) {
+               sTimeout.tv_sec = iTimeout;
+               sTimeout.tv_usec = 0;
+               setsockopt(iFd, SOL_SOCKET, SO_RCVTIMEO, (void *)&sTimeout,
+                               sizeof(sTimeout));
+               setsockopt(iFd, SOL_SOCKET, SO_SNDTIMEO, (void *)&sTimeout,
+                               sizeof(sTimeout));
+
+               iFlags = fcntl(iFd, F_GETFL, 0);
+               fcntl(iFd, F_SETFL, iFlags & (~O_NONBLOCK));
+       } else {
+               /* if no timeout specified do a non blocking read */
+               iFlags = fcntl(iFd, F_GETFL, 0);
+               fcntl(iFd, F_SETFL, iFlags | O_NONBLOCK);
+       }
+
+       /* Start reading in bytes from the socket and keep reading until we have
+        * the requested number of bytes or EOF occurs.
+        */
+       pcCur = pcBuffer;
+       tNleft = iBufferSize;
+       while (tNleft > 0) {
+               /*  Get as many bytes, up to current maximum as we can.  */
+               tNread = read(iFd, pcCur, tNleft);
+
+               if (tNread < 0) {
+                       if (errno == EINTR) {
+                               /*  We were interrupted, but this is not an error condition.  */
+                               tNread = 0;
+                       } else if ((errno == EAGAIN) && (!iTimeout)) {
+                               return EN_LUTF_RC_SOCKET_FAIL;
+                       } else {
+                               /*  System error has occurred. */
+                               return EN_LUTF_RC_SOCKET_FAIL;
+                       }
+               } else {
+                       if (tNread == 0) {
+                               /* End of file encountered. This is most
+                                * likely the client closing their end of the
+                                * socket.
+                                */
+                               return EN_LUTF_RC_SOCKET_FAIL;
+                       }
+               }
+
+               tNleft -= tNread;
+               pcCur += tNread;
+       }
+
+       return EN_LUTF_RC_OK;
+}
+
+lutf_rc_t lutf_send_msg(int fd, char *msg, size_t msg_size,
+                       lutf_msg_type_t type)
+{
+       lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
+       lutf_message_hdr_t hdr;
+
+       rc = populateMsgHdr(fd, (char *)&hdr, type,
+                           msg_size, LUTF_VERSION_NUMBER);
+       if (rc != EN_LUTF_RC_OK) {
+               PERROR("Failed to populate message header");
+               return rc;
+       }
+
+       rc = sendTcpMessage(fd, (char *)&hdr, sizeof(hdr));
+       if (rc != EN_LUTF_RC_OK) {
+               PERROR("Failed to send msg header");
+               return rc;
+       }
+
+       if (msg_size) {
+               rc = sendTcpMessage(fd, msg, msg_size);
+               if (rc != EN_LUTF_RC_OK) {
+                       PERROR("Failed to send msg body");
+                       return rc;
+               }
+       }
+
+       return rc;
+}
diff --git a/lustre/tests/lutf/src/liblutf_global.c b/lustre/tests/lutf/src/liblutf_global.c
new file mode 100644 (file)
index 0000000..0225516
--- /dev/null
@@ -0,0 +1,91 @@
+#include <pthread.h>
+#include <time.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include "lutf.h"
+
+char *get_lutf_path(void)
+{
+       return g_lutf_cfg.lutf_path;
+}
+
+char *get_py_path(void)
+{
+       return g_lutf_cfg.py_path;
+}
+
+char *get_master_name(void)
+{
+       return g_lutf_cfg.master_name;
+}
+
+char *get_suite_name(void)
+{
+       return g_lutf_cfg.suite;
+}
+
+char *get_script_name(void)
+{
+       return g_lutf_cfg.script;
+}
+
+char *get_matching_pattern(void)
+{
+       return g_lutf_cfg.pattern;
+}
+
+int get_master_listen_port(void)
+{
+       return g_lutf_cfg.l_info.listen_port;
+}
+
+char *get_node_name(void)
+{
+       return g_lutf_cfg.l_info.hb_info.node_name;
+}
+
+int get_agent_telnet_port(void)
+{
+       return g_lutf_cfg.l_info.hb_info.agent_telnet_port;
+}
+
+char *get_master_address(void)
+{
+       return inet_ntoa(g_lutf_cfg.l_info.hb_info.master_address.sin_addr);
+}
+
+int get_master_port(void)
+{
+       return g_lutf_cfg.l_info.hb_info.master_address.sin_port;
+}
+
+lutf_run_mode_t get_lutf_mode(void)
+{
+       return g_lutf_cfg.shell;
+}
+
+lutf_type_t get_lutf_type(void)
+{
+       return g_lutf_cfg.l_info.type;
+}
+
+char *get_lutf_results_file_path(void)
+{
+       return g_lutf_cfg.results_file;
+}
+
+char *get_lutf_cfg_file_path(void)
+{
+       return g_lutf_cfg.cfg_path;
+}
+
+char *get_lutf_tmp_dir(void)
+{
+       return g_lutf_cfg.tmp_dir;
+}
diff --git a/lustre/tests/lutf/src/lutf.c b/lustre/tests/lutf/src/lutf.c
new file mode 100644 (file)
index 0000000..e174fe9
--- /dev/null
@@ -0,0 +1,519 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <errno.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <fcntl.h>
+#include <string.h>
+#include <strings.h>
+#include <sys/time.h>
+#include <sys/socket.h>
+#include "lnetconfig/cyaml.h"
+#include "lutf_listener.h"
+#include "lutf_message.h"
+#include "lutf_python.h"
+#include "lutf.h"
+
+#define HB_TIMEOUT     2
+
+struct in_addr g_local_ip;
+FILE *out;
+char *outlog;
+
+/*externs needed by getopt lib*/
+extern char *optarg;
+extern int optind;
+
+static void
+lutf_help_usage(const struct option *long_options, const char *description[])
+{
+       int i = 0;
+
+       fprintf(stderr, BOLDCYAN "LUTF Runs in two modes: "
+               RESET BOLDMAGENTA "Master" RESET BOLDCYAN " or " BOLDRED "Agent\n\n"
+               BOLDMAGENTA
+               "Master Mode\n"
+               "    . Runs on the Test Master node and controls all agents\n"
+               BOLDRED
+               "Agent Mode:\n"
+               "    . Runs on the Nodes Under Test\n\n"
+               BOLDGREEN
+               "Look at lutf/python/config/lutf_cfg_sample.yaml for a sample "
+               "LUTF configuration\n\n"
+               RESET
+               "Options:\n");
+
+       while ((long_options[i].name != NULL) && (description[i] != NULL)) {
+               fprintf(stderr, "\t-%c or --%s %s\n",
+                       (char) long_options[i].val,
+                       long_options[i].name,
+                       description[i]);
+               i++;
+       }
+
+       fprintf(stderr, "\n");
+}
+
+static struct cYAML *get_value(struct cYAML *head, char *key)
+{
+       struct cYAML *child = head;
+
+       while (child != NULL) {
+               if (strcmp(child->cy_string, key) == 0)
+                       return child;
+               child = child->cy_next;
+       }
+
+       return NULL;
+}
+
+static
+lutf_rc_t hostname_to_ip(char *hostname, char *ip, int len)
+{
+       struct addrinfo hints, *servinfo, *p;
+       struct sockaddr_in *h;
+       int rv;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
+       hints.ai_socktype = SOCK_STREAM;
+
+       if ((rv = getaddrinfo(hostname, "http", &hints, &servinfo)) != 0) {
+               PERROR("getaddrinfo: %s\n", gai_strerror(rv));
+               return EN_LUTF_RC_BAD_ADDR;
+       }
+
+       // loop through all the results and connect to the first we can
+       memset(ip, 0, len);
+       for (p = servinfo; p != NULL; p = p->ai_next) {
+               h = (struct sockaddr_in *) p->ai_addr;
+               strncpy(ip, inet_ntoa(h->sin_addr), len-1);
+       }
+
+       freeaddrinfo(servinfo); // all done with this structure
+       return EN_LUTF_RC_OK;
+}
+
+static
+lutf_rc_t extract_config_parameters(struct cYAML *config_tree,
+                                   lutf_config_params_t *cfg,
+                                   char **elem)
+{
+       struct in_addr addr;
+       struct cYAML *head;
+       struct cYAML *tmp;
+       char maddr[24];
+       lutf_rc_t rc;
+
+       head = config_tree->cy_child;
+
+       if (strcmp(head->cy_string, "lutf") != 0) {
+               *elem = "lutf";
+               return EN_LUTF_RC_BAD_PARAM;
+       }
+
+       /* go  to the list of elements we need to browse */
+       head = head->cy_child;
+
+       tmp = get_value(head, "shell");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING) {
+                       if (strcmp(tmp->cy_valuestring,
+                                  INTERACTIVE) == 0) {
+                               cfg->shell = EN_LUTF_RUN_INTERACTIVE;
+                       } else if (strcmp(tmp->cy_valuestring,
+                                       BATCH) == 0) {
+                               cfg->shell = EN_LUTF_RUN_BATCH;
+                       } else if (strcmp(tmp->cy_valuestring,
+                                       DAEMON) == 0) {
+                               cfg->shell = EN_LUTF_RUN_DAEMON;
+                       } else {
+                               *elem = "shell";
+                               return EN_LUTF_RC_BAD_PARAM;
+                       }
+               } else {
+                       *elem = "shell";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               *elem = "shell";
+               return EN_LUTF_RC_MISSING_PARAM;
+       }
+
+       tmp = get_value(head, "agent");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_FALSE)
+                       cfg->l_info.type = EN_LUTF_MASTER;
+               else if (tmp->cy_type == CYAML_TYPE_TRUE)
+                       cfg->l_info.type = EN_LUTF_AGENT;
+               else {
+                       *elem = "agent";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               cfg->l_info.type = EN_LUTF_MASTER;
+       }
+
+       tmp = get_value(head, "telnet-port");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_NUMBER)
+                       cfg->l_info.hb_info.agent_telnet_port = tmp->cy_valueint;
+               else {
+                       *elem = "telnet-port";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               cfg->l_info.hb_info.agent_telnet_port = -1;
+       }
+
+       tmp = get_value(head, "master-address");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING) {
+                       if (!inet_aton(tmp->cy_valuestring, &addr)) {
+                               /* maybe it's a host name so let's try
+                                * that out
+                                */
+                               rc = EN_LUTF_RC_BAD_ADDR;
+                               if ((rc = hostname_to_ip(tmp->cy_valuestring, maddr,
+                                                        sizeof(maddr)))
+                                   != EN_LUTF_RC_OK) {
+                                       *elem = "master-address";
+                                       return rc;
+                               } else if (!inet_aton(maddr, &addr)) {
+                                       *elem = "master-address";
+                                       return rc;
+                               }
+                       }
+                       cfg->l_info.hb_info.master_address.sin_addr = addr;
+               } else {
+                       *elem = "master-address";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else if (cfg->l_info.type == EN_LUTF_AGENT) {
+               *elem = "master-address";
+               return EN_LUTF_RC_MISSING_PARAM;
+       }
+
+       tmp = get_value(head, "master-port");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_NUMBER) {
+                       cfg->l_info.hb_info.master_address.sin_port = tmp->cy_valueint;
+                       cfg->l_info.listen_port = tmp->cy_valueint;
+               } else {
+                       *elem = "master-port";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               cfg->l_info.hb_info.master_address.sin_port = DEFAULT_MASTER_PORT;
+               cfg->l_info.listen_port = DEFAULT_MASTER_PORT;
+       }
+       cfg->l_info.hb_info.master_address.sin_family = AF_INET;
+
+       tmp = get_value(head, "lutf-path");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       cfg->lutf_path = tmp->cy_valuestring;
+               else {
+                       *elem = "lutf-path";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               *elem = "lutf-path";
+               return EN_LUTF_RC_MISSING_PARAM;
+       }
+
+       tmp = get_value(head, "py-path");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       cfg->py_path = tmp->cy_valuestring;
+               else {
+                       *elem = "py-path";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       }
+
+       tmp = get_value(head, "node-name");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING) {
+                       strncpy(cfg->l_info.hb_info.node_name, tmp->cy_valuestring,
+                               MAX_STR_LEN);
+                       cfg->l_info.hb_info.node_name[MAX_STR_LEN - 1] = '\0';
+               } else {
+                       *elem = "node-name";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               strncpy(cfg->l_info.hb_info.node_name, TEST_ROLE_GRC,
+                       MAX_STR_LEN);
+               cfg->l_info.hb_info.node_name[MAX_STR_LEN - 1] = '\0';
+       }
+
+       tmp = get_value(head, "master-name");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       cfg->master_name = tmp->cy_valuestring;
+               else
+                       return EN_LUTF_RC_BAD_PARAM;
+       } else if (cfg->l_info.type == EN_LUTF_AGENT) {
+               *elem = "master-name";
+               return EN_LUTF_RC_MISSING_PARAM;
+       }
+
+       tmp = get_value(head, "suite");
+       if (tmp && cfg->l_info.type == EN_LUTF_MASTER) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       if (strlen(tmp->cy_valuestring) > 0)
+                               cfg->suite = tmp->cy_valuestring;
+                       else
+                               cfg->suite = NULL;
+               else {
+                       *elem = "suite";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       }
+
+       tmp = get_value(head, "script");
+       if (tmp && cfg->l_info.type == EN_LUTF_MASTER) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       if (strlen(tmp->cy_valuestring) > 0)
+                               cfg->script = tmp->cy_valuestring;
+                       else
+                               cfg->script = NULL;
+               else {
+                       *elem = "script";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       }
+
+       if (!cfg->suite && cfg->script) {
+               *elem = "suite";
+               return EN_LUTF_RC_BAD_PARAM;
+       }
+
+       tmp = get_value(head, "pattern");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       if (strlen(tmp->cy_valuestring) > 0)
+                               cfg->pattern = tmp->cy_valuestring;
+                       else
+                               cfg->pattern = "*";
+               else {
+                       *elem = "pattern";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               cfg->pattern = "*";
+       }
+
+       tmp = get_value(head, "results");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       cfg->results_file = tmp->cy_valuestring;
+               else {
+                       *elem = "results";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               cfg->results_file = "lutf_def_results";
+       }
+
+       tmp = get_value(head, "agent-list");
+       if (tmp) {
+               if (cYAML_is_sequence(tmp))
+                       cfg->agents = tmp;
+               else {
+                       *elem = "agent-list";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               cfg->agents = NULL;
+       }
+
+       tmp = get_value(head, "tmp-dir");
+       if (tmp) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       cfg->tmp_dir = tmp->cy_valuestring;
+               else {
+                       *elem = "tmp-dir";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       } else {
+               cfg->tmp_dir = "/tmp/lutf/";
+       }
+
+       return EN_LUTF_RC_OK;
+}
+
+int
+main(int argc, char *argv[])
+{
+       int cOpt;
+       pthread_t l_thread_id;
+       lutf_rc_t rc;
+       int trc;
+       char *config_file = NULL;
+       char *elem = NULL;
+       struct cYAML *config_tree;
+       struct cYAML *err_rc = NULL;
+
+       out = stdout;
+
+       memset(&g_lutf_cfg, 0, sizeof(g_lutf_cfg));
+
+       /* If followed by a ':', the option requires an argument*/
+       const char *const short_options = "c:h";
+       const struct option long_options[] = {
+               {.name = "config", .has_arg = required_argument, .val = 'c'},
+               {.name = "help", .has_arg = no_argument, .val = 'h'},
+               {NULL, 0, NULL, 0}
+       };
+
+       const char *description[] = {
+               /*'c'*/":\n\t\tYAML config file",
+               /*'h'*/":\n\t\tPrint this help",
+               NULL
+       };
+
+       /* sanity check */
+       if (argc < 1) {
+               lutf_help_usage(long_options, description);
+               exit(LUTF_EXIT_ERR_STARTUP);
+       }
+
+       /*now process command line arguments*/
+       if (argc > 1) {
+               while ((cOpt = getopt_long(argc, argv,
+                                          short_options,
+                                          long_options,
+                                          NULL)) != -1) {
+                       switch (cOpt) {
+                       case 'c':
+                               config_file = optarg;
+                               break;
+                       case 'h':
+                               lutf_help_usage(long_options, description);
+                               exit(LUTF_EXIT_NORMAL);
+                       default:
+                               PERROR("Bad parameter");
+                               exit(LUTF_EXIT_ERR_BAD_PARAM);
+                               break;
+                       }
+               }
+       }
+
+       if (!config_file) {
+               lutf_help_usage(long_options, description);
+               exit(LUTF_EXIT_ERR_BAD_PARAM);
+       }
+
+       g_lutf_cfg.cfg_path = config_file;
+
+       config_tree = cYAML_build_tree(config_file, NULL, 0, &err_rc, false);
+       if (!config_tree) {
+               PERROR("Failed to parse config file: %s", config_file);
+               exit(LUTF_EXIT_ERR_BAD_PARAM);
+       }
+
+       rc = extract_config_parameters(config_tree, &g_lutf_cfg, &elem);
+       if (rc != EN_LUTF_RC_OK) {
+               PERROR("Parsing configuration failed on %s with %s",
+                      elem, lutf_rc2str(rc));
+               exit(LUTF_EXIT_ERR_BAD_PARAM);
+       }
+
+       outlog = calloc(strlen(g_lutf_cfg.tmp_dir) + strlen(OUT_LOG_NAME) + 2, 1);
+
+       if (!outlog) {
+               PERROR("out of memory");
+               exit(LUTF_EXIT_ERR_STARTUP);
+       }
+
+       sprintf(outlog, "%s/%s", g_lutf_cfg.tmp_dir, OUT_LOG_NAME);
+
+       out = fopen(outlog, "w");
+
+       if (!out) {
+               fprintf(stderr, "Failed to open log files: %s\n",
+                       outlog);
+               exit(LUTF_EXIT_ERR_STARTUP);
+       }
+
+       if (g_lutf_cfg.shell == EN_LUTF_RUN_DAEMON) {
+               pid_t process_id = 0;
+               pid_t sid = 0;
+
+               /* create the child process */
+               process_id = fork();
+               if (process_id < 0) {
+                       PERROR("Failed to run lutf as deamon");
+                       exit(LUTF_EXIT_ERR_DEAMEON_STARTUP);
+               }
+
+               if (process_id > 0) {
+                       /*
+                        * We're in the parent process so let's kill it
+                        * off
+                        */
+                       PDEBUG("Shutting down parent process");
+                       exit(LUTF_EXIT_NORMAL);
+               }
+
+               umask(0);
+               sid = setsid();
+               if (sid < 0) {
+                       PERROR("forking child failed");
+                       exit(LUTF_EXIT_ERR_DEAMEON_STARTUP);
+               }
+
+               rc = chdir("/");
+               close(STDIN_FILENO);
+               close(STDOUT_FILENO);
+               close(STDERR_FILENO);
+               if (rc) {
+                       PERROR("chdir failed");
+                       exit(LUTF_EXIT_ERR_DEAMEON_STARTUP);
+               }
+       }
+
+       /*
+        * Spawn the listener thread if we are in Master Mode.
+        * The listener thread listens for Heart beats and deals
+        * with maintaining the health of the agents. If an agent
+        * dies and comes back again, then we know how to deal
+        * with it.
+        */
+       trc = pthread_create(&l_thread_id, NULL,
+                            lutf_listener_main,
+                            &g_lutf_cfg.l_info);
+       if (trc) {
+               PERROR("Failed to start thread");
+               exit(LUTF_EXIT_ERR_THREAD_STARTUP);
+       }
+
+       /* spawn listener thread iff running in Master mode */
+       rc = python_init();
+       if (rc) {
+               PERROR("Failed to initialize Python Module");
+               if (rc == EN_LUTF_RC_ERR_THREAD_STARTUP)
+                       exit(LUTF_EXIT_ERR_THREAD_STARTUP);
+               else
+                       exit(LUTF_EXIT_ERR_STARTUP);
+       }
+
+       pthread_join(l_thread_id, NULL);
+
+       fclose(out);
+
+       cYAML_free_tree(config_tree);
+
+       return 0;
+}
diff --git a/lustre/tests/lutf/src/lutf.h b/lustre/tests/lutf/src/lutf.h
new file mode 100644 (file)
index 0000000..79bd62d
--- /dev/null
@@ -0,0 +1,139 @@
+#ifndef LUTF_H
+#define LUTF_H
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <time.h>
+#include <sys/stat.h>
+#include "lutf_common.h"
+#include "lutf_agent.h"
+#include "lutf_message.h"
+
+extern FILE *out;
+extern char *outlog;
+
+#define OUT_LOG_NAME "lutf_out.log"
+#define OUT_PY_LOG "lutf_py.log"
+#define LARGE_LOG_FILE 400000000 /* 400 MB */
+
+time_t debugnow;
+int di;
+char debugtimestr[30];
+
+static inline void lutf_log_print(bool error, char *color1, char *color2,
+                                 char *file, int line, char *fmt, ...)
+{
+       time_t debugnow;
+       int di;
+       char debugtimestr[30];
+       struct stat st;
+       va_list args;
+
+       /* check if the log file has grown too large */
+       stat(outlog, &st);
+       if (st.st_size > LARGE_LOG_FILE)
+               out = freopen(outlog, "w", out);
+
+       time(&debugnow);
+       ctime_r(&debugnow, debugtimestr);
+       for (di = 0; di < 30; di++) {
+               if (debugtimestr[di] == '\n')
+                       debugtimestr[di] = '\0';
+       }
+
+       fprintf(out, "%s%s %s:%s:%d " RESET "%s- ", color1,
+               (error) ? "ERROR" : "", debugtimestr, file, line, color2);
+       va_start(args, fmt);
+       vfprintf(out, fmt, args);
+       va_end(args);
+       fprintf(out, RESET"\n");
+       fflush(out);
+}
+
+#define PERROR(fmt, args...) lutf_log_print(true, BOLDRED, RED, __FILE__, __LINE__, fmt, ## args)
+#define PDEBUG(fmt, args...) lutf_log_print(false, BOLDGREEN, GREEN, __FILE__, __LINE__, fmt, ## args)
+
+typedef struct hb_info_s {
+       struct sockaddr_in master_address;
+       int agent_telnet_port;
+       char node_name[MAX_STR_LEN];
+} hb_info_t;
+
+typedef struct lutf_listener_info_s {
+       lutf_type_t type;
+       int listen_port;
+       hb_info_t hb_info;
+} lutf_listener_info_t;
+
+typedef struct lutf_config_params_s {
+       lutf_listener_info_t l_info;
+       lutf_run_mode_t shell; /* run in [non]-interactive or daemon mode */
+       char *cfg_path; /* path to config file */
+       char *lutf_path; /* path to lutf */
+       char *py_path; /* other python specific paths */
+       char *master_name; /* name of master. Important if I'm an agent */
+       char *suite; /* name of suite to run. Run all if not present */
+       char *script; /* name of script to run. Suite must be specified */
+       char *pattern; /* file match pattern */
+       char *results_file; /* path to results file */
+       char *tmp_dir; /* directory to put temporary files */
+       struct cYAML *agents; /* list of agents to wait for before
+                              * starting the test
+                              */
+} lutf_config_params_t;
+
+lutf_config_params_t g_lutf_cfg;
+
+static inline char *lutf_rc2str(lutf_rc_t rc)
+{
+       char *str[] = {
+               [EN_LUTF_RC_OK] = "RC_OK",
+               [EN_LUTF_RC_FAIL*-1] = "RC_FAIL",
+               [EN_LUTF_RC_SYS_ERR*-1] = "RC_SYSTEM_ERROR",
+               [EN_LUTF_RC_BAD_VERSION*-1] = "RC_BAD_VERSION",
+               [EN_LUTF_RC_SOCKET_FAIL*-1] = "RC_SOCKET_FAIL",
+               [EN_LUTF_RC_BIND_FAILED*-1] = "RC_BIND_FAIL",
+               [EN_LUTF_RC_LISTEN_FAILED*-1] = "RC_LISTEN_FAIL",
+               [EN_LUTF_RC_CLIENT_CLOSED*-1] = "RC_CLIENT_CLOSED",
+               [EN_LUTF_RC_ERR_THREAD_STARTUP*-1] = "RC_ERR_THREAD_START",
+               [EN_LUTF_RC_AGENT_NOT_FOUND*-1] = "RC_AGENT_NOT_FOUND",
+               [EN_LUTF_RC_PY_IMPORT_FAIL*-1] = "RC_PY_IMPORT_FAIL",
+               [EN_LUTF_RC_PY_SCRIPT_FAIL*-1] = "RC_PY_SCRIPT_FAIL",
+               [EN_LUTF_RC_RPC_FAIL*-1] = "RC_RPC_FAIL",
+               [EN_LUTF_RC_OOM*-1] = "RC_OOM",
+               [EN_LUTF_RC_BAD_PARAM*-1] = "RC_BAD_PARAM",
+               [EN_LUTF_RC_BAD_ADDR*-1] = "RC_BAD_ADDR",
+               [EN_LUTF_RC_MISSING_PARAM*-1] = "RC_MISSING_PARAM",
+               [EN_LUTF_RC_TIMEOUT*-1] = "RC_TIMEOUT",
+       };
+
+       if (rc <= EN_LUTF_RC_MAX)
+               return "BAD RC";
+
+       rc *= -1;
+
+       return str[rc];
+}
+
+int establishTCPConnection(unsigned long uiAddress,
+                          int iPort,
+                          bool b_non_block,
+                          bool endian);
+
+
+lutf_rc_t sendTcpMessage(int iTcpSocket, char *pcBody, int iBodySize);
+
+lutf_rc_t lutf_send_msg(int fd, char *msg, size_t msg_size,
+                       lutf_msg_type_t type);
+
+lutf_rc_t populateMsgHdr(int rsocket, char *msg_hdr,
+                        int msg_type, int msg_size,
+                        int lutf_version_number);
+
+lutf_rc_t readTcpMessage(int iFd, char *pcBuffer,
+                        int iBufferSize, int iTimeout);
+
+lutf_rc_t closeTcpConnection(int iTcpSocket);
+
+#endif /* LUTF_H */
diff --git a/lustre/tests/lutf/src/lutf_agent.h b/lustre/tests/lutf/src/lutf_agent.h
new file mode 100644 (file)
index 0000000..f509004
--- /dev/null
@@ -0,0 +1,197 @@
+#ifndef LUTF_AGENTS_H
+#define LUTF_AGENTS_H
+
+#include "lutf_common.h"
+
+#define MAX_NUM_AGENTS         1024
+#define HB_TO                  2
+
+struct cYAML;
+
+#define LUTF_AGENT_STATE_ALIVE (1 << 0)
+#define LUTF_AGENT_HB_CHANNEL_CONNECTED (1 << 1)
+#define LUTF_AGENT_RPC_CHANNEL_CONNECTED (1 << 2)
+#define LUTF_AGENT_WORK_IN_PROGRESS (1 << 3)
+#define LUTF_AGENT_ZOMBIE (1 << 4)
+
+typedef struct lutf_agent_blk_s {
+       pthread_mutex_t mutex;
+       unsigned int id;
+       unsigned int version;
+       unsigned int telnet_port;
+       unsigned int listen_port;
+       char name[MAX_STR_LEN];
+       char hostname[MAX_STR_LEN];
+       int iFileDesc;
+       int iRpcFd;
+       struct timeval time_stamp;
+       struct sockaddr_in addr;
+       unsigned int state;
+       unsigned int ref_count;
+       lutf_type_t node_type;
+} lutf_agent_blk_t;
+
+/* lutf_agent_get_highest_fd
+ *     Find the highest connected FD in all connected agents.
+ */
+int lutf_agent_get_highest_fd(void);
+
+/* agent_state2str
+ *     print agent state
+ */
+char *agent_state2str(lutf_agent_blk_t *agent);
+
+/* get_local_ip
+ *   gets the local IP address being used to send messages to the master
+ */
+char *get_local_ip();
+
+/*
+ * get_next_active_agent
+ *     given an index start searching from that point on to find an
+ *     active agent.
+ *     To reset the loop start the index from 0
+ */
+int get_next_active_agent(int idx, lutf_agent_blk_t **out);
+
+/*
+ * find_agent_blk_by_id
+ *     Find the agent blk given an internal ID
+ *     Agent ref-count is incremented
+ */
+lutf_agent_blk_t *find_agent_blk_by_id(int idx);
+
+/*
+ * find_agent_blk_by_ip
+ *     Find the agent blk given its IP address
+ *     Agent ref-count is incremented
+ */
+lutf_agent_blk_t *find_agent_blk_by_ip(char *ip);
+
+/*
+ * find_agent_blk_by_name
+ *     Find the agent blk given its name
+ *     Agent ref-count is incremented
+ */
+lutf_agent_blk_t *find_agent_blk_by_name(char *name);
+
+/*
+ * find_create_agent_blk_by_addr
+ *     return an agent block with this address or create a new one
+ */
+lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr);
+
+/*
+ * find_free_agent_blk
+ *     Find a free agent block
+ */
+lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr);
+
+/*
+ * free_agent_blk
+ *     Free an agent blk that no longer is needed
+ */
+void free_agent_blk(int id);
+
+/*
+ * acquire_agent_blk
+ *     acquire the agent for work
+ */
+void acquire_agent_blk(lutf_agent_blk_t *agent);
+
+/*
+ * release_agent_blk
+ *     Release the agent blk
+ */
+void release_agent_blk(lutf_agent_blk_t *agent);
+
+/*
+ * agent_ip2str
+ *     Returns the ip string representation
+ */
+char *agent_ip2str(lutf_agent_blk_t *agent);
+
+/*
+ * agent_hb_check
+ *     Given a time struct insure that the agent doesn't exceed the HB
+ *     time.
+ */
+void agent_hb_check(struct timeval *t, lutf_type_t whoami);
+
+/*
+ * agent_disable_hb
+ *     Disables the HB
+ */
+void agent_disable_hb(void);
+
+/*
+ * agent_enable_hb
+ *     Enables the HB
+ */
+void agent_enable_hb(void);
+
+/*
+ * get the number of registered agents
+ */
+int get_num_agents(void);
+
+/*
+ * Connect to masterIP:masterPort and get the number of agents connected
+ * to the master
+ */
+int get_num_agents_remote(char *masterIP, int masterPort);
+
+/*
+ * wait for the agents specified to be connected. If we don't get the full
+ * list within the time specified then fail
+ */
+lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout);
+
+/*
+ * set_agent_state
+ *
+ * convenience function to set the agent state
+ */
+void set_agent_state(lutf_agent_blk_t *agent, unsigned int state);
+
+/*
+ * unset_agent_state
+ *
+ * unset the state and check if the agent is a zombie and
+ * it has not pending work. If so then free it
+ */
+void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state);
+
+/*
+ * lutf_send_rpc
+ *   send an RPC message and wait for the RPC response
+ *   RPCs always come in request/response pairs. This function will send
+ *   the request and will block until it gets the response. If it doesn't
+ *   get a response in the specified timeout it'll fail.
+ *   Parameters:
+ *     target: name of the agent to send to
+ *     yaml: NULL terminated string to send to the target
+ *     timeout: to wait for response
+ *     rsp: rpc response
+ *
+ *  Return:
+ *     Returns a string YAML block
+ */
+lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp);
+
+/*
+ * lutf_send_rpc_rsp
+ *   send a response to the RPC origin.
+ *   Parameters:
+ *      target: name of the agent to send to
+ *      yaml: NULL terminated string to send to the target
+ */
+lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml);
+
+/*
+ * agent_init
+ *     Initialize the agent module
+ */
+void agent_init(void);
+
+#endif /* LUTF_AGENTS_H */
diff --git a/lustre/tests/lutf/src/lutf_agent.swg b/lustre/tests/lutf/src/lutf_agent.swg
new file mode 100644 (file)
index 0000000..881c29a
--- /dev/null
@@ -0,0 +1,9 @@
+%module clutf_agent
+
+%{
+#include <netinet/in.h>
+#include "lutf_agent.h"
+%}
+
+#include "lutf_agent.h"
+
diff --git a/lustre/tests/lutf/src/lutf_common.h b/lustre/tests/lutf/src/lutf_common.h
new file mode 100644 (file)
index 0000000..8db40cb
--- /dev/null
@@ -0,0 +1,89 @@
+#ifndef LUTF_COMMON_H
+#define LUTF_COMMON_H
+
+#define RESET   "\033[0m"
+#define BLACK   "\033[30m"      /* Black */
+#define RED     "\033[31m"      /* Red */
+#define GREEN   "\033[32m"      /* Green */
+#define YELLOW  "\033[33m"      /* Yellow */
+#define BLUE    "\033[34m"      /* Blue */
+#define MAGENTA "\033[35m"      /* Magenta */
+#define CYAN    "\033[36m"      /* Cyan */
+#define WHITE   "\033[37m"      /* White */
+#define BOLDBLACK   "\033[1m\033[30m"      /* Bold Black */
+#define BOLDRED     "\033[1m\033[31m"      /* Bold Red */
+#define BOLDGREEN   "\033[1m\033[32m"      /* Bold Green */
+#define BOLDYELLOW  "\033[1m\033[33m"      /* Bold Yellow */
+#define BOLDBLUE    "\033[1m\033[34m"      /* Bold Blue */
+#define BOLDMAGENTA "\033[1m\033[35m"      /* Bold Magenta */
+#define BOLDCYAN    "\033[1m\033[36m"      /* Bold Cyan */
+#define BOLDWHITE   "\033[1m\033[37m"      /* Bold White */
+
+#define LUTF_VERSION_NUMBER             1
+
+#define MAX_STR_LEN                     1024
+#define MAX_MSG_SIZE                    2048
+
+#define LUTF_EXIT_NORMAL                0
+#define LUTF_EXIT_ERR_STARTUP          -1
+#define LUTF_EXIT_ERR_BAD_PARAM                -2
+#define LUTF_EXIT_ERR_THREAD_STARTUP   -3
+#define LUTF_EXIT_ERR_DEAMEON_STARTUP  -4
+
+#define SYSTEMIPADDR                   0x7f000001
+#define INVALID_TCP_SOCKET             -1
+#define SOCKET_TIMEOUT_USEC            900000
+#define SOCKET_CONN_TIMEOUT_SEC                2
+#define TCP_READ_TIMEOUT_SEC           20
+
+/* default names */
+#define TEST_ROLE_GRC          "GENERIC"
+#define TEST_ROLE_MGS          "MGS"
+#define TEST_ROLE_MDT          "MDT"
+#define TEST_ROLE_OSS          "OSS"
+#define TEST_ROLE_OST          "OST"
+#define TEST_ROLE_RTR          "RTR"
+#define TEST_ROLE_CLI          "CLI"
+
+#define DEFAULT_MASTER_PORT    8282
+
+typedef enum {
+       EN_LUTF_RC_OK = 0,
+       EN_LUTF_RC_FAIL = -1,
+       EN_LUTF_RC_SYS_ERR = -2,
+       EN_LUTF_RC_BAD_VERSION = -3,
+       EN_LUTF_RC_SOCKET_FAIL = -4,
+       EN_LUTF_RC_BIND_FAILED = -5,
+       EN_LUTF_RC_LISTEN_FAILED = -6,
+       EN_LUTF_RC_CLIENT_CLOSED = -7,
+       EN_LUTF_RC_ERR_THREAD_STARTUP = -8,
+       EN_LUTF_RC_AGENT_NOT_FOUND = -9,
+       EN_LUTF_RC_PY_IMPORT_FAIL = -10,
+       EN_LUTF_RC_PY_SCRIPT_FAIL = -11,
+       EN_LUTF_RC_RPC_FAIL = -12,
+       EN_LUTF_RC_OOM = -13,
+       EN_LUTF_RC_BAD_PARAM = -14,
+       EN_LUTF_RC_BAD_ADDR = -15,
+       EN_LUTF_RC_MISSING_PARAM = -16,
+       EN_LUTF_RC_TIMEOUT = -17,
+       EN_LUTF_RC_MAX = -18,
+} lutf_rc_t;
+
+typedef enum lutf_type {
+       EN_LUTF_MASTER = 1,
+       EN_LUTF_AGENT = 2,
+       EN_LUTF_INVALID,
+} lutf_type_t;
+
+#define INTERACTIVE "interactive"
+#define BATCH "batch"
+#define DAEMON "daemon"
+
+typedef enum lutf_run_mode {
+       EN_LUTF_RUN_INTERACTIVE = 1,
+       EN_LUTF_RUN_BATCH = 2,
+       EN_LUTF_RUN_DAEMON = 3,
+       EN_LUTF_RUN_INVALID,
+} lutf_run_mode_t;
+
+#endif /* LUTF_COMMON_H */
diff --git a/lustre/tests/lutf/src/lutf_global.h b/lustre/tests/lutf/src/lutf_global.h
new file mode 100644 (file)
index 0000000..29b808a
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef LUTF_CONNECT_H
+#define LUTF_CONNECT_H
+
+#include "lutf_common.h"
+
+/* accessor functions to get global information */
+
+char *get_lutf_path(void);
+char *get_py_path(void);
+char *get_master_name(void);
+char *get_suite_name(void);
+char *get_script_name(void);
+char *get_matching_pattern(void);
+int get_master_listen_port(void);
+char *get_node_name(void);
+int get_agent_telnet_port(void);
+char *get_master_address(void);
+int get_master_port(void);
+lutf_run_mode_t get_lutf_mode(void);
+lutf_type_t get_lutf_type(void);
+char *get_lutf_results_file_path(void);
+char *get_lutf_cfg_file_path(void);
+char *get_lutf_tmp_dir(void);
+
+#endif /* LUTF_CONNECT_H */
diff --git a/lustre/tests/lutf/src/lutf_global.swg b/lustre/tests/lutf/src/lutf_global.swg
new file mode 100644 (file)
index 0000000..1cb2666
--- /dev/null
@@ -0,0 +1,12 @@
+%module clutf_global
+
+%{
+#include <netinet/in.h>
+typedef unsigned int bool;
+#define true 1
+#define false 0
+#include "lutf_global.h"
+%}
+
+#include "lutf_global.h"
+
diff --git a/lustre/tests/lutf/src/lutf_listener.c b/lustre/tests/lutf/src/lutf_listener.c
new file mode 100644 (file)
index 0000000..51690b6
--- /dev/null
@@ -0,0 +1,613 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include <string.h>
+#include <signal.h>
+#include "lutf.h"
+#include "lutf_python.h"
+#include "lutf_agent.h"
+#include "lutf_message.h"
+#include "lutf_listener.h"
+
+static fd_set g_tAllSet;
+static bool g_bShutdown = false;
+static int g_iListenFd = INVALID_TCP_SOCKET;
+bool g_agent_enable_hb = true;
+
+typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
+
+static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent);
+static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent);
+static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent);
+
+static msg_process_fn_t msg_process_tbl[EN_MSG_TYPE_MAX] = {
+       [EN_MSG_TYPE_HB] = process_msg_hb,
+       [EN_MSG_TYPE_GET_NUM_AGENTS] = process_msg_get_num_agents,
+       [EN_MSG_TYPE_RPC_REQUEST] = process_msg_rpc_request,
+};
+
+void lutf_listener_shutdown(void)
+{
+       g_bShutdown = true;
+}
+
+int get_highest_fd(void)
+{
+       int iAgentFd = lutf_agent_get_highest_fd();
+       int iMaxFd;
+
+       if (iAgentFd > g_iListenFd)
+               iMaxFd = iAgentFd;
+       else
+               iMaxFd = g_iListenFd;
+       PDEBUG("Current highest FD = %d", iMaxFd);
+
+       return iMaxFd;
+}
+
+static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent)
+{
+       lutf_rc_t rc;
+
+       agent->state |= LUTF_AGENT_WORK_IN_PROGRESS;
+       rc = python_handle_rpc_request(msg);
+       agent->state &= ~LUTF_AGENT_WORK_IN_PROGRESS;
+
+       return rc;
+}
+
+static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent)
+{
+       lutf_msg_hb_t *hb = (lutf_msg_hb_t *)msg;
+       //PERROR("Procesing HB message");
+
+       /* endian convert message */
+       hb->telnet_port = ntohl(hb->telnet_port);
+       hb->node_type = ntohl(hb->node_type);
+
+       /* update the agent with the information */
+       agent->telnet_port = hb->telnet_port;
+       agent->node_type = hb->node_type;
+       strncpy(agent->hostname, hb->node_hostname, MAX_STR_LEN);
+       agent->hostname[MAX_STR_LEN-1] = '\0';
+       strncpy(agent->name, hb->node_name, MAX_STR_LEN);
+       agent->name[MAX_STR_LEN-1] = '\0';
+       gettimeofday(&agent->time_stamp, NULL);
+
+       return EN_LUTF_RC_OK;
+}
+
+static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent)
+{
+       lutf_rc_t rc;
+       lutf_msg_num_agents_query_t query;
+
+       query.num_agents = get_num_agents();
+       rc = sendTcpMessage(agent->iFileDesc, (char *)&query, sizeof(query));
+       if (rc) {
+               PERROR("failed to send tcp message to get num agents query");
+               return rc;
+       }
+
+       return EN_LUTF_RC_OK;
+}
+
+static lutf_rc_t process_agent_message(lutf_agent_blk_t *agent, int fd)
+{
+       lutf_rc_t rc = EN_LUTF_RC_OK;
+       lutf_message_hdr_t hdr;
+       char *buffer;
+       msg_process_fn_t proc_fn;
+
+       /* get the header first */
+       rc = readTcpMessage(fd, (char *)&hdr, sizeof(hdr),
+                           TCP_READ_TIMEOUT_SEC);
+
+       if (rc)
+               return rc;
+
+       hdr.version = ntohl(hdr.version);
+       if (hdr.version != LUTF_VERSION_NUMBER) {
+               PERROR("version %d != %d", hdr.version,
+                      LUTF_VERSION_NUMBER);
+               return EN_LUTF_RC_BAD_VERSION;
+       }
+
+       /* if the ips don't match ignore the message */
+       if (memcmp(&agent->addr.sin_addr, &hdr.ip, sizeof(hdr.ip)))
+               return rc;
+
+       hdr.type = ntohl(hdr.type);
+       hdr.len = ntohl(hdr.len);
+
+       buffer = calloc(hdr.len, 1);
+       if (!buffer)
+               return EN_LUTF_RC_OOM;
+
+       /* get the rest of the message */
+       rc = readTcpMessage(fd, buffer, hdr.len,
+                           TCP_READ_TIMEOUT_SEC);
+
+       if (rc) {
+               free(buffer);
+               return rc;
+       }
+
+       /* call the appropriate processing function */
+       proc_fn = msg_process_tbl[hdr.type];
+       if (proc_fn)
+               rc = proc_fn(buffer, agent);
+
+       free(buffer);
+       return rc;
+}
+
+static lutf_rc_t init_comm(unsigned short server_port)
+{
+       int iFlags;
+       struct sockaddr_in sServAddr;
+
+       signal(SIGPIPE, SIG_IGN);
+
+       /*  Create a socket to listen to.  */
+       g_iListenFd = socket(AF_INET, SOCK_STREAM, 0);
+       if (g_iListenFd < 0) {
+               /*  Cannot create a listening socket.  */
+               return EN_LUTF_RC_SOCKET_FAIL;
+       }
+
+       /* Set a socket option which will allow us to be quickly restarted
+        * if necessary.
+        */
+       iFlags = 1;
+       if (setsockopt(g_iListenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &iFlags,
+                      sizeof(iFlags)) < 0) {
+               /*  Cannot change the socket options.  */
+               closeTcpConnection(g_iListenFd);
+               return EN_LUTF_RC_FAIL;
+       }
+
+       /*  Bind to our listening socket.  */
+       bzero((char *) &sServAddr, sizeof(sServAddr));
+       sServAddr.sin_family = AF_INET;
+       sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+       sServAddr.sin_port = htons(server_port);
+
+       if (bind(g_iListenFd, (struct sockaddr *) &sServAddr, sizeof(sServAddr)) < 0) {
+               /*  Cannot bind our listening socket.  */
+               closeTcpConnection(g_iListenFd);
+               return EN_LUTF_RC_BIND_FAILED;
+       }
+
+       /*  Let the system know we wish to listen to this port for
+        *  connections. */
+       if (listen(g_iListenFd, 2) < 0) {
+               /*  Cannot listen to socket, close and fail  */
+               closeTcpConnection(g_iListenFd);
+               return EN_LUTF_RC_LISTEN_FAILED;
+       }
+
+       /* We want this socket to be non-blocking even though it will be used
+        * in a blocking select call. This is to avoid a problem identified by
+        * Richard Stevens.
+        */
+       iFlags = fcntl(g_iListenFd, F_GETFL, 0);
+       fcntl(g_iListenFd, F_SETFL, iFlags | O_NONBLOCK);
+
+       /*  Add the listening socket to our select() mask.  */
+       FD_ZERO(&g_tAllSet);
+       FD_SET(g_iListenFd, &g_tAllSet);
+
+       return EN_LUTF_RC_OK;
+}
+
+static inline int close_agent_connection(lutf_agent_blk_t *agent)
+{
+       FD_CLR(agent->iFileDesc, &g_tAllSet);
+       FD_CLR(agent->iRpcFd, &g_tAllSet);
+       closeTcpConnection(agent->iRpcFd);
+       closeTcpConnection(agent->iFileDesc);
+       agent->state &=
+               ~LUTF_AGENT_RPC_CHANNEL_CONNECTED;
+       agent->state &=
+               ~LUTF_AGENT_HB_CHANNEL_CONNECTED;
+       return get_highest_fd();
+}
+
+lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
+                 int type)
+{
+       lutf_msg_hb_t hb;
+       int rc;
+
+       hb.telnet_port = htonl(telnet_port);
+       hb.node_type = htonl(type);
+       strncpy(hb.node_name, name, MAX_STR_LEN);
+       hb.node_name[MAX_STR_LEN-1] = '\0';
+       gethostname(hb.node_hostname, MAX_STR_LEN);
+
+       /* send the heart beat */
+       rc = lutf_send_msg(agent->iFileDesc, (char *)&hb,
+                          sizeof(hb), EN_MSG_TYPE_HB);
+       if (rc != EN_LUTF_RC_OK) {
+               PERROR("Failed to send heart beat %s\n",
+                       lutf_rc2str(rc));
+       }
+
+       return rc;
+}
+
+lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
+{
+       /* we assume the first connection is an HB connection */
+       if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
+               if (agent->iFileDesc != INVALID_TCP_SOCKET) {
+                       PERROR("agent in unexpected state. "
+                              "state is %s, but HB FD is %d",
+                              agent_state2str(agent), fd);
+                       return EN_LUTF_RC_SYS_ERR;
+               } else {
+                       PDEBUG("HB Channel Connected: %d", fd);
+                       agent->iFileDesc = fd;
+                       agent->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
+                       return EN_LUTF_RC_OK;
+               }
+       } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
+               if (agent->iRpcFd != INVALID_TCP_SOCKET) {
+                       PERROR("agent in unexpected state. "
+                              "state is %s, but RPC FD is %d",
+                              agent_state2str(agent), fd);
+                       return EN_LUTF_RC_SYS_ERR;
+               } else {
+                       PDEBUG("RPC Channel Connected: %d", fd);
+                       agent->iRpcFd = fd;
+                       agent->state |= LUTF_AGENT_RPC_CHANNEL_CONNECTED;
+                       return EN_LUTF_RC_OK;
+               }
+       }
+
+       PERROR("agent is in an unexpected state on connection %s",
+              agent_state2str(agent));
+       return EN_LUTF_RC_SYS_ERR;
+}
+
+/*
+ * lutf_listener_main
+ *   main loop.  Listens for incoming agent connections, and for agent
+ *   messages.  Every period of time it triggers a walk through the agent
+ *   list to see if any of the HBs stopped
+ *
+ *   If I am an Agent, then attempt to connect to the master and add an
+ *   agent block on the list of agents. After successful connection send
+ *   a regular heart beat.
+ *
+ *   Since the master's agent block is on the list of agents and its FD is
+ *   on the select FD set, then if the master sends the agent a message
+ *   the agent should be able to process it.
+ */
+void *lutf_listener_main(void *usr_data)
+{
+       int iConnFd;
+       struct sockaddr_in sCliAddr;
+       socklen_t  tCliLen;
+       fd_set tReadSet;
+       int iNReady;
+       int iMaxSelectFd;
+       int i;
+       lutf_rc_t rc;
+       lutf_agent_blk_t *agent = NULL, *master = NULL;
+       struct timeval time_1, time_2, select_to;
+       lutf_listener_info_t *info;
+       bool master_connected = false;
+
+       info = (lutf_listener_info_t *)usr_data;
+       if ((!info) ||
+           ((info) && (info->listen_port == 0))) {
+               PERROR("No liston port provided");
+               return NULL;
+       }
+
+       rc = init_comm(info->listen_port);
+       if (rc) {
+               PERROR("init_comm failed: %s", lutf_rc2str(rc));
+               return NULL;
+       }
+
+       agent_init();
+
+       iMaxSelectFd = g_iListenFd;
+
+       gettimeofday(&time_1, NULL);
+
+
+       /*  Main Processing Loop: Keep going until we have reason to shutdown. */
+       while (!g_bShutdown) {
+               /*  Wait on our select mask for an event to occur.  */
+               tReadSet = g_tAllSet;
+
+               select_to.tv_sec = HB_TO;
+               select_to.tv_usec = 0;
+
+               iNReady = select(iMaxSelectFd + 1, &tReadSet, NULL, NULL, &select_to);
+
+               /*  Determine if we failed the select call */
+               if (iNReady < 0) {
+                       /*  Check to see if we were interrupted by a signal.  */
+                       if ((errno == EINTR) || (errno == EAGAIN)) {
+                               PERROR("Select failure: errno = %d", errno);
+                       } else {
+                               /*  If this is an ECONNABORTED error, just ignore it.  */
+                               if (errno != ECONNABORTED) {
+                                       /* Raise a fatal alarm.  */
+                                       /* Shut down */
+                                       PERROR("Shutting down Listener thread. errno: %d", errno);
+                                       g_bShutdown = true;
+                               }
+                       }
+               } else {
+                       if (FD_ISSET(g_iListenFd, &tReadSet)) {
+                               /*  A new client is trying to connect.  */
+                               tCliLen = sizeof(sCliAddr);
+                               iConnFd = accept(g_iListenFd, (struct sockaddr *) &sCliAddr,
+                                                &tCliLen);
+                               if (iConnFd < 0) {
+                                       /*  Cannot accept new connection...just ignore.  */
+                                       if (errno != EWOULDBLOCK)
+                                               PERROR("Error on accept(), errno = %d", errno);
+                               } else {
+                                       /* Try to see if we have an agent
+                                        * with the same address, since
+                                        * agents can have multiple tcp
+                                        * connections open
+                                        */
+                                       agent = find_create_agent_blk_by_addr(&sCliAddr);
+                                       if (!agent) {
+                                               /*  Cannot support more clients...just ignore.  */
+                                               PERROR("Cannot accept more clients");
+                                               closeTcpConnection(iConnFd);
+                                       } else {
+                                               int iOption, iFlags;
+
+                                               rc = complete_agent_connection(agent,
+                                                               iConnFd);
+                                               if (rc != EN_LUTF_RC_OK) {
+                                                       int agent_id = agent->id;
+                                                       iMaxSelectFd = close_agent_connection(agent);
+                                                       release_agent_blk(agent);
+                                                       free_agent_blk(agent_id);
+                                                       continue;
+                                               }
+
+                                               /* all nodes listen on the
+                                                * same port
+                                                */
+                                               agent->listen_port = info->listen_port;
+
+                                               /*  Add new client to our select mask.  */
+                                               FD_SET(iConnFd, &g_tAllSet);
+                                               iMaxSelectFd = get_highest_fd();
+
+                                               /* Ok, it seems that the connected socket gains
+                                                * the same flags as the listen socket.  We want
+                                                * to make it blocking here.
+                                                */
+                                               iFlags = fcntl(iConnFd, F_GETFL, 0);
+                                               fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
+
+                                               /*  And, we want to turn off Nagle's algorithm to
+                                                *  reduce latency
+                                                */
+                                               iOption = 1;
+                                               setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
+                                                          (void *)&iOption,
+                                                          sizeof(iOption));
+
+                                               PDEBUG("Received a connection from %s on FD %d\n",
+                                                      inet_ntoa(agent->addr.sin_addr), iConnFd);
+
+                                               release_agent_blk(agent);
+                                       }
+                               }
+
+                               /*  See if there are other messages waiting.  */
+                               iNReady--;
+                       }
+
+                       /* need to iterate through the clients and see if a
+                        * message was sent to any of them
+                        */
+                       for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
+                               /* reset the return code to avoid misbehaving on previous
+                                * returns
+                                */
+                               rc = EN_LUTF_RC_OK;
+
+                               if ((agent = find_agent_blk_by_id(i))) {
+                                       int hb_fd = INVALID_TCP_SOCKET;
+                                       int rpc_fd = INVALID_TCP_SOCKET;
+
+                                       if (FD_ISSET(agent->iFileDesc, &tReadSet))
+                                               hb_fd = agent->iFileDesc;
+                                       if (FD_ISSET(agent->iRpcFd, &tReadSet))
+                                               rpc_fd = agent->iRpcFd;
+
+                                       if (hb_fd == INVALID_TCP_SOCKET &&
+                                           rpc_fd == INVALID_TCP_SOCKET)
+                                               continue;
+
+                                       /* process heart beat */
+                                       if (hb_fd != INVALID_TCP_SOCKET) {
+                                               /* process the message */
+                                               rc = process_agent_message(agent, hb_fd);
+                                               if (rc)
+                                                       PERROR("msg failure: %s",
+                                                              lutf_rc2str(rc));
+                                       }
+                                       if (rc == EN_LUTF_RC_SOCKET_FAIL) {
+                                               int agent_id = agent->id;
+                                               if (agent->id == master->id) {
+                                                       PERROR("Disconnected from master. Will attempt to reconnect");
+                                                       master_connected = false;
+                                               }
+                                               iMaxSelectFd = close_agent_connection(agent);
+                                               release_agent_blk(agent);
+                                               free_agent_blk(agent_id);
+                                               continue;
+                                       }
+
+                                       /* process rpc */
+                                       if (rpc_fd != INVALID_TCP_SOCKET) {
+                                               /* process the message */
+                                               rc = process_agent_message(agent, rpc_fd);
+                                               if (rc)
+                                                       PERROR("msg failure: %s",
+                                                              lutf_rc2str(rc));
+                                       }
+                                       if (rc == EN_LUTF_RC_SOCKET_FAIL) {
+                                               int agent_id = agent->id;
+                                               if (agent->id == master->id) {
+                                                       PERROR("Disconnected from master. Will attempt to reconnect");
+                                                       master_connected = false;
+                                               }
+                                               iMaxSelectFd = close_agent_connection(agent);
+                                               release_agent_blk(agent);
+                                               free_agent_blk(agent_id);
+                                               continue;
+                                       }
+                                       release_agent_blk(agent);
+                               }
+                       }
+
+                       /* establish connection with the master if I'm an agent
+                        * and I have not connected to the master yet.
+                        * Otherwise send a heart beat
+                        */
+                       if (!master_connected &&
+                           strlen(g_lutf_cfg.master_name) != 0) {
+                               PDEBUG("Attempting a connection on master %s",
+                                      g_lutf_cfg.master_name);
+                               master = find_free_agent_blk(&info->hb_info.master_address);
+                               if (!master) {
+                                       PERROR("Failed to allocate agent block");
+                                       continue;
+                               }
+
+                               iConnFd = establishTCPConnection(
+                                       info->hb_info.master_address.sin_addr.s_addr,
+                                       htons(info->hb_info.master_address.sin_port),
+                                       true, false);
+
+                               if (iConnFd < 0) {
+                                       int master_id = master->id;
+
+                                       PERROR("establishTCPConnection failure: %s. Clearing set",
+                                               lutf_rc2str(iConnFd));
+                                       iMaxSelectFd = close_agent_connection(master);
+                                       release_agent_blk(master);
+                                       free_agent_blk(master_id);
+                                       PERROR("Disconnected from master. Will attempt to reconnect");
+                                       master_connected = false;
+                                       continue;
+                               }
+
+                               master->iFileDesc = iConnFd;
+                               memcpy(&master->addr,
+                                      &info->hb_info.master_address,
+                                      sizeof(master->addr));
+                               strncpy(master->name, g_lutf_cfg.master_name,
+                                       MAX_STR_LEN);
+                               master->name[MAX_STR_LEN-1] = '\0';
+                               master->node_type = EN_LUTF_MASTER;
+                               gethostname(master->hostname, MAX_STR_LEN);
+                               master->telnet_port = info->hb_info.agent_telnet_port;
+                               release_agent_blk(master);
+
+                               PDEBUG("Connected to master %s on fd %d",
+                                      master->name, master->iFileDesc);
+
+                               /*
+                                * add the master FD to the select FD set
+                                * to be able to process master messages
+                                */
+                               FD_SET(iConnFd, &g_tAllSet);
+                               iMaxSelectFd = get_highest_fd();
+
+                               master_connected = true;
+                               master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
+                       }
+/*
+                       if (info->type == EN_LUTF_AGENT) {
+                               rc = send_hb(master, info->hb_info.node_name,
+                                            info->hb_info.agent_telnet_port,
+                                            info->type);
+                               if (rc != EN_LUTF_RC_OK) {
+                                       master_connected = false;
+                                       iMaxSelectFd = get_highest_fd();
+                               }
+                       }
+*/
+                       /*
+                        * Get the time stamp and go through each agent
+                        * and see if it's still healthy.  For agents which
+                        * aren't healthy move off to the dead_list.
+                        * This operation is only valid if I'm a master
+                        */
+                       gettimeofday(&time_2, NULL);
+                       if (g_agent_enable_hb && info->type == EN_LUTF_MASTER) {
+                               /* check if HB_TO seconds has passed since the last
+                                * time we collected the time */
+                               if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
+
+                                       /* do the heartbeat check */
+                                       agent_hb_check(&time_1, info->type);
+                               }
+                       }
+
+                       if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
+                               lutf_agent_blk_t *agent = NULL;
+                               int idx = 0;
+
+                               do {
+                                       idx = get_next_active_agent(idx, &agent);
+                                       /* A master doesn't send a heart
+                                        * beat to himself */
+                                       if (agent) {
+                                               if (info->type == EN_LUTF_MASTER &&
+                                                   agent->id == master->id)
+                                                       continue;
+                                               int agent_id = agent->id;
+                                               rc = send_hb(agent, info->hb_info.node_name,
+                                                            info->hb_info.agent_telnet_port,
+                                                            info->type);
+                                               if (rc != EN_LUTF_RC_OK) {
+                                                       if (agent->id == master->id) {
+                                                               PERROR("Disconnected from master. Will attempt to reconnect");
+                                                               master_connected = false;
+                                                       }
+                                                       iMaxSelectFd = close_agent_connection(agent);
+                                                       release_agent_blk(agent);
+                                                       free_agent_blk(agent_id);
+                                               } else {
+                                                       release_agent_blk(agent);
+                                               }
+                                       }
+                               } while (agent);
+                       }
+               }
+               /* store the current time */
+               time_1 = time_2;
+       }
+
+       /* Zero out the g_tAllSet */
+       FD_ZERO(&g_tAllSet);
+
+       return NULL;
+}
diff --git a/lustre/tests/lutf/src/lutf_listener.h b/lustre/tests/lutf/src/lutf_listener.h
new file mode 100644 (file)
index 0000000..629f677
--- /dev/null
@@ -0,0 +1,14 @@
+#ifndef LUTF_LISTENER_H
+#define LUTF_LISTENER_H
+
+#include "lutf_common.h"
+
+/*
+ * lutf_listener_main
+ *   Main loop of the listener thread
+ */
+void *lutf_listener_main(void *usr_data);
+
+void lutf_listener_shutdown(void);
+
+#endif /* LUTF_LISTENER_H */
diff --git a/lustre/tests/lutf/src/lutf_message.h b/lustre/tests/lutf/src/lutf_message.h
new file mode 100644 (file)
index 0000000..efbd918
--- /dev/null
@@ -0,0 +1,32 @@
+#ifndef LUTF_MESSAGE_H
+#define LUTF_MESSAGE_H
+
+#include "lutf_common.h"
+
+typedef enum {
+       EN_MSG_TYPE_HB = 0,
+       EN_MSG_TYPE_GET_NUM_AGENTS,
+       EN_MSG_TYPE_RPC_REQUEST,
+       EN_MSG_TYPE_RPC_RESPONSE,
+       EN_MSG_TYPE_MAX
+} lutf_msg_type_t;
+
+typedef struct lutf_message_hdr_s {
+       lutf_msg_type_t type;
+       unsigned int len;
+       struct in_addr ip;
+       unsigned int version;
+} lutf_message_hdr_t;
+
+typedef struct lutf_msg_hb_s {
+       unsigned int telnet_port;
+       lutf_type_t node_type;
+       char node_name[MAX_STR_LEN];
+       char node_hostname[MAX_STR_LEN];
+} lutf_msg_hb_t;
+
+typedef struct lutf_msg_num_agents_query_s {
+       int num_agents;
+} lutf_msg_num_agents_query_t;
+
+#endif /* LUTF_MESSAGE_H */
diff --git a/lustre/tests/lutf/src/lutf_python.c b/lustre/tests/lutf/src/lutf_python.c
new file mode 100644 (file)
index 0000000..a3efe9d
--- /dev/null
@@ -0,0 +1,244 @@
+#include <Python.h>
+#include <netinet/in.h>
+#include "lutf.h"
+#include "lutf_python.h"
+#include "lutf_listener.h"
+
+extern lutf_config_params_t g_lutf_cfg;
+bool g_py_inited = false;
+
+static char *get_path_segment(char *path, int *len, char **more)
+{
+       char *found = strchr(path, ':');
+       char *str = path;
+
+       if (!found) {
+               *more = NULL;
+               *len = strlen(str);
+               return str;
+       }
+
+       *len = found - str;
+       if (*(found + 1) == '\n')
+               *more = NULL;
+       else
+               *more = found + 1;
+
+       return str;
+}
+
+static void python_run_interactive_shell(void)
+{
+       char buf[MAX_STR_LEN + 20];
+       char segment[MAX_STR_LEN];
+       char *seg;
+       char *more = g_lutf_cfg.py_path;
+       int len = 0;
+       lutf_rc_t rc;
+
+       PyRun_SimpleString("import code\n");
+       PyRun_SimpleString("import os\n");
+       PyRun_SimpleString("import sys\n");
+       PyRun_SimpleString("import readline\n");
+
+       /* all other paths are figured out within python */
+       snprintf(buf, MAX_STR_LEN,
+               "sys.path.append(os.path.join('%s', 'python', 'infra'))",
+               g_lutf_cfg.lutf_path);
+       PyRun_SimpleString(buf);
+
+       snprintf(buf, MAX_STR_LEN,
+               "sys.path.append(\"%s/src\")\n",
+               g_lutf_cfg.lutf_path);
+       PyRun_SimpleString(buf);
+
+       while (more != NULL) {
+               seg = get_path_segment(more, &len, &more);
+               snprintf(segment, len+1, "%s", seg);
+               segment[len] = '\0';
+               snprintf(buf, sizeof(buf),
+                       "sys.path.append(\"%s\")\n", segment);
+               PyRun_SimpleString(buf);
+       }
+
+       PyRun_SimpleString("import lutf\n");
+       PyRun_SimpleString("from lutf import me,suites,"
+                          "agents,dumpGlobalTestResults,R,A,I,X\n");
+
+       g_py_inited = true;
+
+       if (g_lutf_cfg.shell == EN_LUTF_RUN_BATCH &&
+           g_lutf_cfg.l_info.type == EN_LUTF_MASTER) {
+               char *pattern = g_lutf_cfg.pattern;
+
+               PDEBUG("Running in Batch mode. Checking Agents are connected");
+
+               rc = wait_for_agents(g_lutf_cfg.agents, 20);
+               if (rc == EN_LUTF_RC_TIMEOUT) {
+                       PERROR("Not all agents connected. Aborting tests");
+                       return;
+               }
+
+               /* update the LUTF internal database */
+               PyRun_SimpleString("agents.reload()");
+               PDEBUG("Agents reloaded. Dumping");
+               PyRun_SimpleString("agents.dump()");
+
+               if (g_lutf_cfg.script && strlen(g_lutf_cfg.script) > 0) {
+                       snprintf(buf, MAX_STR_LEN,
+                                "suites['%s'].scripts['%s'].run()",
+                                g_lutf_cfg.suite,
+                                g_lutf_cfg.script);
+               } else if (g_lutf_cfg.suite && strlen(g_lutf_cfg.suite) > 0) {
+                       snprintf(buf, MAX_STR_LEN,
+                                "suites['%s'].run('%s')",
+                                g_lutf_cfg.suite, pattern);
+               } else {
+                       snprintf(buf, MAX_STR_LEN,
+                                "suites.run('%s')", pattern);
+               }
+               PDEBUG("%s", buf);
+               PyRun_SimpleString(buf);
+               snprintf(buf, MAX_STR_LEN,
+                        "dumpGlobalTestResults('%s')", g_lutf_cfg.results_file);
+               PDEBUG("%s", buf);
+               PyRun_SimpleString(buf);
+               PDEBUG("Shutting down the LUTF");
+               PyRun_SimpleString("me.exit()");
+               lutf_listener_shutdown();
+               return;
+       } else if (g_lutf_cfg.shell == EN_LUTF_RUN_INTERACTIVE) {
+               int rc;
+               char *intro;
+
+               PDEBUG("Running in Interactive mode");
+               /*
+                * start an independent shell
+                * Since we imported all the necessary modules to start in
+                * the main interpreter, copying the globals should copy
+                * them in the interactive shell.
+                */
+               PyRun_SimpleString("vars = globals().copy()\n");
+               PyRun_SimpleString("vars.update(locals())\n");
+               PyRun_SimpleString("shell = code.InteractiveConsole(vars)\n");
+               PyRun_SimpleString("shell.push('sys.ps1 = \"lutf>>> \"')\n");
+               PyRun_SimpleString("shell.push('sys.ps2 = \"lutf... \"')\n");
+
+               /* import base lutf module */
+               g_py_inited = true;
+               intro = "shell.interact(\"Welcome to the Lustre Unit Test Framework (LUTF)\\n\""
+                       "\"Convenience Functions: R() = dumpGlobalTestResults(), A() = agents.dump(), I() = me.dump_intfs(), X() = me.exit()\")";
+               rc = PyRun_SimpleString(intro);
+               if (rc)
+                       goto python_shutdown;
+       } else {
+               /* run the telnet server. This becomes our main process
+                * now
+                */
+               PDEBUG("Running in Daemon mode");
+               sprintf(segment, "fname = os.path.join('%s', '%s')\n",
+                       g_lutf_cfg.lutf_path, OUT_PY_LOG);
+               if (PyRun_SimpleString(segment)) {
+                       PERROR("Failed to create log file");
+                       goto python_shutdown;
+               }
+               sprintf(segment, "logfile = open(fname, 'w')\n");
+               if (PyRun_SimpleString(segment)) {
+                       PERROR("Failed to open log file");
+                       goto python_shutdown;
+               }
+               if (PyRun_SimpleString("sys.stdout = sys.stderr = logfile\n")) {
+                       PERROR("Failed to redirect stdout and stderr");
+                       goto python_shutdown;
+               }
+               if (PyRun_SimpleString("from lutf_telnet_sr import LutfTelnetServer\n")) {
+                       PERROR("Failed to import LutfTelnetServer");
+                       goto python_shutdown;
+               }
+               sprintf(segment, "tns = LutfTelnetServer(%d)\n",
+                       g_lutf_cfg.l_info.hb_info.agent_telnet_port);
+               if (PyRun_SimpleString(segment)) {
+                       PERROR("Failed to instantiate LutfTelnetServer");
+                       goto python_shutdown;
+               }
+               if (PyRun_SimpleString("tns.run()\n")) {
+                       PERROR("Failed to run LutfTelnetServer instance");
+                       goto python_shutdown;
+               }
+               if (PyRun_SimpleString("logfile.close()")) {
+                       PERROR("Failed to close logfile");
+                       goto python_shutdown;
+               }
+python_shutdown:
+               PERROR("Exiting the python interpreter");
+       }
+       g_py_inited = false;
+       lutf_listener_shutdown();
+}
+
+/*
+ * gcc py.c -o py -I/usr/local/include/python2.7
+ * -L/usr/local/lib/python2.7/config -lm -ldl -lpthread -lutil -lpython2.7
+ */
+lutf_rc_t python_init(void)
+{
+       wchar_t program[5];
+
+       swprintf(program, 3, L"%hs", "lutf");
+
+       //char *path;
+       //char new_path[MAX_STR_LEN];
+       Py_SetProgramName(program);
+       //char *py_args[1];
+
+       //py_args[0] = argv[0];
+
+       Py_Initialize();
+
+       //sprintf(new_path, "%s:%s", path, script_path);
+       //PySys_SetPath(new_path);
+       //path = Py_GetPath();
+
+       python_run_interactive_shell();
+       PDEBUG("Python finalizing");
+
+       Py_Finalize();
+
+       PDEBUG("Python finalized");
+
+       return EN_LUTF_RC_OK;
+}
+
+lutf_rc_t python_handle_rpc_request(char *rpc)
+{
+       lutf_rc_t rc = EN_LUTF_RC_OK;
+       PyGILState_STATE gstate;
+       PyObject *handle_rpc_req;
+       PyObject *lutf;
+       PyObject *me;
+       PyObject *str;
+       PyObject *args;
+       PyObject *result;
+
+       if (!g_py_inited)
+               return EN_LUTF_RC_PY_SCRIPT_FAIL;
+
+       PDEBUG(rpc);
+
+       gstate = PyGILState_Ensure();
+
+       str = PyUnicode_FromString((char*)"lutf");
+       lutf = PyImport_Import(str);
+       me = PyObject_GetAttrString(lutf, (char*)"me");
+       handle_rpc_req = PyObject_GetAttrString(me, (char*)"handle_rpc_req");
+       args = PyTuple_Pack(1, PyUnicode_FromString(rpc));
+       result = PyObject_CallObject(handle_rpc_req, args);
+
+       if (!result)
+               PDEBUG("handle_rpc_req() didn't return any values");
+
+       PyGILState_Release(gstate);
+
+       return rc;
+}
+
diff --git a/lustre/tests/lutf/src/lutf_python.h b/lustre/tests/lutf/src/lutf_python.h
new file mode 100644 (file)
index 0000000..02298f5
--- /dev/null
@@ -0,0 +1,29 @@
+#ifndef LUTF_PYTHON_H
+#define LUTF_PYTHON_H
+
+#include <pthread.h>
+#include "lutf.h"
+
+typedef struct python_thread_data_s {
+       char **argv;
+} python_thread_data_t;
+
+/*
+ * python_init
+ *   Initialize the python interpreter.
+ */
+lutf_rc_t python_init(void);
+
+/*
+ * python_collect_agent_core
+ *   Collect core information from the specified agent
+ */
+lutf_rc_t python_collect_agent_core(char *ip);
+
+/*
+ * python_handle_rpc_request
+ *   Received an RPC now execute the operation in the python interpreter
+ */
+lutf_rc_t python_handle_rpc_request(char *rpc);
+
+#endif /* LUTF_PYTHON_H */
diff --git a/lustre/tests/lutf/swig_templates/generate_lnetconfig_swig_i.py b/lustre/tests/lutf/swig_templates/generate_lnetconfig_swig_i.py
new file mode 100755 (executable)
index 0000000..c73025a
--- /dev/null
@@ -0,0 +1,145 @@
+import os
+import sys
+
+def writeSwigI(block, ommit_list, swigI, function_ommit=[]):
+       # create a new block with the updated info
+       new_block = []
+       prev_line = ''
+       brace = 0
+       function_ommitting = False
+       for line in block:
+               skip = False
+               if function_ommitting:
+                       if '{' in line:
+                               brace += 1
+                       if '}' in line:
+                               brace -= 1
+                       if brace == 0:
+                               function_ommitting = False
+                       continue
+               for ommit in function_ommit:
+                       if ommit in line:
+                               # we found the function. We'll not be too smart and we'll
+                               # assume that if there is a beginning it'll be in the line
+                               # before.
+                               if prev_line != '':
+                                       new_block = new_block[:len(new_block)-1]
+                               function_ommitting = True;
+                               if '{' in line:
+                                       brace += 1
+                               skip = True
+                               break
+               if skip:
+                       continue
+               for ommit in ommit_list:
+                       if line.startswith(ommit):
+                               skip = True
+                               break
+               if skip:
+                       continue
+               new_block.append(line)
+               prev_line = line
+       for line in new_block:
+               # remove __user flag, since swig can't understand
+               # it
+               newline = line
+               if "#define __user" in newline:
+                       newline = line.replace('__user', '__lutf_user')
+               elif "#ifndef __user" in newline:
+                       newline = line.replace('__user', '__lutf_user')
+               else:
+                       newline = line.replace('__user', '')
+               swigI.write(newline)
+
+# set up the paths to all the files that need to be swigified
+netconfig_path = sys.argv[1] + '/lnet/utils/lnetconfig/liblnetconfig.h'
+cyaml_path = sys.argv[1] + '/lnet/utils/lnetconfig/cyaml.h'
+lib_dlc_path = sys.argv[1] + '/lnet/include/uapi/linux/lnet/lnet-dlc.h'
+string_path = sys.argv[1] + '/libcfs/include/libcfs/util/string.h'
+nidstr_path = sys.argv[1] + '/lnet/include/uapi/linux/lnet/nidstr.h'
+limits_path = '/usr/include/limits.h'
+typemap_path = sys.argv[1] + '/lustre/tests/lutf/swig_templates/typemap.template'
+lutf_missing_def = sys.argv[1] + '/lustre/tests/lutf/swig_templates/lutf_missing_definitions.h'
+lutf_extra_defs = sys.argv[1] + '/lustre/tests/lutf/swig_templates/liblnetconfig.template'
+swig_intf_path = sys.argv[1] + '/lustre/tests/lutf/src/liblnetconfig.i'
+
+# open these files
+i_netconfig = open(netconfig_path)
+i_cyaml = open(cyaml_path)
+i_lib_dlc = open(lib_dlc_path)
+i_string = open(string_path)
+i_nidstr = open(nidstr_path)
+i_limits = open(limits_path)
+i_typemap = open(typemap_path)
+i_lutf_missing_def = open(lutf_missing_def)
+
+# open the swig interface output file
+o_swig_intf = open(swig_intf_path, 'w')
+
+# read the files
+l_netconfig = i_netconfig.readlines()
+l_cyaml = i_cyaml.readlines()
+l_lib_dlc = i_lib_dlc.readlines()
+l_string = i_string.readlines()
+l_nidstr = i_nidstr.readlines()
+l_limits = i_limits.readlines()
+l_typemap = i_typemap.readlines()
+l_lutf_missing_def = i_lutf_missing_def.readlines()
+
+# identify all the lines that I'd like to remove from the swig interface
+# file.
+netconfig_ommit_list = ['/*', ' *', '#ifndef LIB_LNET_CONFIG_API_H', '#define LIB_LNET_CONFIG_API_H',
+                       'struct cYAML;', '#endif']
+cyaml_ommit_list = ['/*', ' *', '#ifndef CYAML_H', '#define CYAML_H', '#endif']
+generic_ommit_list = ['/*', ' *', '__printf(3, 4)']
+
+# write the swig interface file
+o_swig_intf.write('%module lnetconfig\n')
+o_swig_intf.write('%{\n')
+
+o_swig_intf.write("#include \"libcfs/util/ioctl.h\"\n")
+o_swig_intf.write("#include \"libcfs/util/string.h\"\n")
+
+writeSwigI(l_lutf_missing_def, [], o_swig_intf)
+writeSwigI(l_cyaml, cyaml_ommit_list, o_swig_intf)
+writeSwigI(l_netconfig, netconfig_ommit_list, o_swig_intf)
+writeSwigI(l_lib_dlc, generic_ommit_list, o_swig_intf)
+o_swig_intf.write('PyObject *lutf_parse_nidlist(char *str, int len, int max_nids);\n')
+o_swig_intf.write('char *lutf_nid2str(unsigned long nid);\n')
+
+o_swig_intf.write('%}\n')
+
+for line in l_typemap:
+       o_swig_intf.write(line)
+
+# handle the typdefs that are declared in
+# /usr/include/asm-generic/int-ll64.h
+# It appears that SWIG has a problem with __signed__ keyword
+o_swig_intf.write("typedef char __s8;\n")
+o_swig_intf.write("typedef unsigned char __u8;\n")
+
+o_swig_intf.write("typedef short __s16;\n")
+o_swig_intf.write("typedef unsigned short __u16;\n")
+
+o_swig_intf.write("typedef int __s32;\n")
+o_swig_intf.write("typedef unsigned int __u32;\n")
+
+o_swig_intf.write("typedef long long __s64;\n")
+o_swig_intf.write("typedef unsigned long long __u64;\n")
+
+
+o_swig_intf.write('PyObject *lutf_parse_nidlist(char *str, int len, int max_nids);\n')
+o_swig_intf.write('char *lutf_nid2str(unsigned long nid);\n')
+writeSwigI(l_limits, [], o_swig_intf)
+writeSwigI(l_cyaml, cyaml_ommit_list, o_swig_intf)
+writeSwigI(l_netconfig, netconfig_ommit_list, o_swig_intf)
+writeSwigI(l_lib_dlc, generic_ommit_list, o_swig_intf)
+function_ommit = ['int vscnprintf(char *buf, size_t bufsz, const char *format, va_list args)',
+                 'static inline int scnprintf(char *buf, size_t bufsz, const char *format, ...)']
+writeSwigI(l_string, generic_ommit_list, o_swig_intf, function_ommit=function_ommit)
+writeSwigI(l_nidstr, generic_ommit_list, o_swig_intf)
+
+with open(lutf_extra_defs, 'r') as f:
+       for line in f:
+               o_swig_intf.write(line)
+o_swig_intf.close()
diff --git a/lustre/tests/lutf/swig_templates/generate_lutf_swig_i.py b/lustre/tests/lutf/swig_templates/generate_lutf_swig_i.py
new file mode 100755 (executable)
index 0000000..62b5f38
--- /dev/null
@@ -0,0 +1,31 @@
+import sys
+import os
+
+intf = open(sys.argv[2], 'r')
+contents = intf.readlines()
+intf.close()
+
+idx = 0
+for c in contents:
+       idx += 1
+       if "%}" in c:
+               break
+
+#typemap_path = sys.argv[1] + '/lustre/tests/lutf/swig_templates/typemap.template'
+typemap_path = os.path.join(sys.argv[1], 'typemap.template')
+i_typemap = open(typemap_path)
+l_typemap = i_typemap.readlines()
+i_typemap.close()
+
+j = 0
+for i in range(idx, idx + len(l_typemap)):
+       contents.insert(i, l_typemap[j])
+       j += 1
+
+new_i_file = os.path.splitext(sys.argv[2])[0]+'.i'
+
+intf = open(new_i_file, 'w')
+contents = "".join(contents)
+intf.write(contents)
+intf.close()
+
diff --git a/lustre/tests/lutf/swig_templates/liblnetconfig.template b/lustre/tests/lutf/swig_templates/liblnetconfig.template
new file mode 100644 (file)
index 0000000..68065d3
--- /dev/null
@@ -0,0 +1,34 @@
+%inline %{
+PyObject *lutf_parse_nidlist(char *str, int len, int max_nids) {
+       int rc, num_nids, i;
+       lnet_nid_t *nidl = calloc(sizeof(*nidl) * max_nids, 1);
+       struct list_head *l = calloc(sizeof(*l), 1);
+       PyObject *pylist;
+       if (!l || !nidl) {
+               if (l)
+                       free(l);
+               if (nidl)
+                       free(nidl);
+               return NULL;
+       }
+       INIT_LIST_HEAD(l);
+       rc = cfs_parse_nidlist(str, len, l);
+       if (!rc) {
+               free(l);
+               return NULL;
+       }
+       num_nids = cfs_expand_nidlist(l, nidl, max_nids);
+       cfs_free_nidlist(l);
+       pylist = PyList_New(num_nids);
+       for (i = 0; i < num_nids; i++) {
+               PyList_SetItem(pylist, i, PyLong_FromUnsignedLongLong(nidl[i]));
+       }
+       free(l);
+       free(nidl);
+       return pylist;
+}
+char *lutf_nid2str(unsigned long nid) {
+       return libcfs_nid2str(nid);
+}
+%}
+
diff --git a/lustre/tests/lutf/swig_templates/lutf_missing_definitions.h b/lustre/tests/lutf/swig_templates/lutf_missing_definitions.h
new file mode 100755 (executable)
index 0000000..4714dc0
--- /dev/null
@@ -0,0 +1,7 @@
+#ifndef LUTF_MISSING_DEFINITIONS_H
+#define LUTF_MISSING_DEFINITIONS_H
+
+size_t strlcpy(char *tgt, const char *src, size_t tgt_len);
+size_t strlcat(char *tgt, const char *src, size_t tgt_len);
+
+#endif /* LUTF_MISSING_DEFINITIONS_H */
diff --git a/lustre/tests/lutf/swig_templates/typemap.template b/lustre/tests/lutf/swig_templates/typemap.template
new file mode 100755 (executable)
index 0000000..f956dbd
--- /dev/null
@@ -0,0 +1,183 @@
+
+/* This is only for python2.7
+%typemap(in) FILE * {
+        $1 = PyFile_AsFile($input);
+}*/
+
+/* typemap for handling cYAML output parameter */
+%typemap(in, numinputs=0) struct cYAML** (struct cYAML *temp) {
+        temp = NULL;
+        $1 = &temp;
+}
+
+%typemap(argout) struct cYAML** {
+        /* The purpose of this typemap is to be able to handle out params
+           Ex: if the function being called is: foo(cYAML**a, cYAML **b)
+           then from python you'd call it: o1, o2 = foo()*/
+        PyObject *o, *o2, *o3;
+        o = SWIG_NewPointerObj(SWIG_as_voidptr(*$1), $*1_descriptor, SWIG_POINTER_OWN);
+        if ((!$result) || ($result == Py_None))
+                $result = o;
+        else
+        {
+                if(!PyTuple_Check($result))
+                {
+                        /* insert the original result in the tuple */
+                        o2 = $result;
+                        $result = PyTuple_New(1);
+                        PyTuple_SetItem($result, 0, o2);
+                }
+                o3 = PyTuple_New(1);
+                PyTuple_SetItem(o3, 0, o);
+                o2 = $result;
+                $result = PySequence_Concat(o2, o3);
+                Py_DECREF(o2);
+                Py_DECREF(o3);
+        }
+}
+
+/* typemap for handling cfs_expr_list output parameter */
+%typemap(in, numinputs=0) struct cfs_expr_list** (struct cfs_expr_list *temp) {
+        temp = NULL;
+        $1 = &temp;
+}
+
+%typemap(argout) struct cfs_expr_list** {
+        /* The purpose of this typemap is to be able to handle out params
+           Ex: if the function being called is: rc = foo(cfs_expr_list **a)
+           then from python you'd call it: o1, o2 = foo() where o2 becomes
+           the out parameter*/
+        PyObject *o, *o2, *o3;
+        o = SWIG_NewPointerObj(SWIG_as_voidptr(*$1), $*1_descriptor, SWIG_POINTER_OWN);
+        if ((!$result) || ($result == Py_None))
+                $result = o;
+        else
+        {
+                if(!PyTuple_Check($result))
+                {
+                        /* insert the original result in the tuple */
+                        o2 = $result;
+                        $result = PyTuple_New(1);
+                        PyTuple_SetItem($result, 0, o2);
+                }
+                o3 = PyTuple_New(1);
+                PyTuple_SetItem(o3, 0, o);
+                o2 = $result;
+                $result = PySequence_Concat(o2, o3);
+                Py_DECREF(o2);
+                Py_DECREF(o3);
+        }
+}
+
+/* typemap for handling array of character array output parameter */
+%typemap(in, numinputs=0) char *** (char **temp) {
+        temp = NULL;
+        $1 = &temp;
+}
+
+%typemap(argout) char *** {
+        /* The purpose of this typemap is to be able to handle out params
+           Ex: if the function being called is: rc = foo(char ***)
+           then from python you'd call it: o1, o2 = foo() where o2 becomes
+           the out parameter*/
+        PyObject *o, *o2, *o3;
+        o = SWIG_NewPointerObj(SWIG_as_voidptr(*$1), $*1_descriptor, SWIG_POINTER_OWN);
+        if ((!$result) || ($result == Py_None))
+                $result = o;
+        else
+        {
+                if(!PyTuple_Check($result))
+                {
+                        /* insert the original result in the tuple */
+                        o2 = $result;
+                        $result = PyTuple_New(1);
+                        PyTuple_SetItem($result, 0, o2);
+                }
+                o3 = PyTuple_New(1);
+                PyTuple_SetItem(o3, 0, o);
+                o2 = $result;
+                $result = PySequence_Concat(o2, o3);
+                Py_DECREF(o2);
+                Py_DECREF(o3);
+        }
+}
+
+
+/* This input typemap declares that char** requires no input parameter.
+ * Instead, the address of a local char* is used to call the function.
+ */
+%typemap(in,numinputs=0) char** (char* tmp) %{
+    tmp = NULL;
+    $1 = &tmp;
+%}
+
+/* After the function is called, the char** parameter contains a malloc'ed
+ * char* pointer.
+ * Construct a Python Unicode object (I'm using Python 3) and append it to
+ * any existing return value for the wrapper.
+ */
+%typemap(argout) char** (PyObject* obj) %{
+    if (*$1 == NULL)
+       goto fail;
+    obj = PyUnicode_FromString(*$1);
+    $result = SWIG_Python_AppendOutput($result,obj);
+%}
+
+/* The malloc'ed pointer is no longer needed, so make sure it is freed. */
+%typemap(freearg) char** %{
+    if (*$1)
+       free(*$1);
+%}
+
+/* typemap for handling lnet_nid_t output parameter */
+%typemap(in,numinputs=0) lnet_nid_t ** (lnet_nid_t *temp) {
+        temp = NULL;
+        $1 = &temp;
+}
+
+%typemap(argout) lnet_nid_t ** {
+        /* The purpose of this typemap is to be able to handle out params
+           Ex: if the function being called is: rc = foo(lnet_nid_t **a)
+           then from python you'd call it: o1, o2 = foo() where o2 becomes
+           the out parameter*/
+        PyObject *o, *o2, *o3;
+        o = SWIG_NewPointerObj(SWIG_as_voidptr(*$1), $*1_descriptor, SWIG_POINTER_OWN);
+        if ((!$result) || ($result == Py_None)) {
+               fprintf(stderr, "AMIR: %d\n", result);
+                $result = o;
+       } else
+        {
+                if(!PyTuple_Check($result))
+                {
+                       fprintf(stderr, "AMIR 2\n");
+                        /* insert the original result in the tuple */
+                        o2 = $result;
+                        $result = PyTuple_New(1);
+                        PyTuple_SetItem($result, 0, o2);
+                }
+               fprintf(stderr, "AMIR 3\n");
+                o3 = PyTuple_New(1);
+                PyTuple_SetItem(o3, 0, o);
+                o2 = $result;
+                $result = PySequence_Concat(o2, o3);
+                Py_DECREF(o2);
+                Py_DECREF(o3);
+        }
+}
+
+/* The malloc'ed pointer is no longer needed, so make sure it is freed. */
+%typemap(freearg) lnet_nid_t ** %{
+    if (*$1) {
+       free(*$1);
+    }
+%}
+
+/*
+ * This is an interesting type map to allow for passing python bytes to
+ * C function using char *
+%typemap(in) (char *yaml_bytes, int yaml_bytes_len) {
+       Py_ssize_t len;
+       PyBytes_AsStringAndSize($input, &$1, &len);
+       $2 = (int)len;
+}
+*/