Whamcloud - gitweb
LU-10973 lnet: LUTF infrastructure updates 77/44177/3
authorAmir Shehata <ashehata@whamcloud.com>
Mon, 5 Jul 2021 18:17:16 +0000 (11:17 -0700)
committerOleg Drokin <green@whamcloud.com>
Tue, 10 Aug 2021 08:06:37 +0000 (08:06 +0000)
Fix Agent management
Handle python failures properly.
Change default location for temporary files to be in /tmp/lutf

Test-Parameters: trivial
Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: I4e37b6226dfa12de4b7a1f5bfd87f84e91ee1dda
Reviewed-on: https://review.whamcloud.com/44177
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Serguei Smirnov <ssmirnov@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/tests/lutf/src/liblutf_agent.c
lustre/tests/lutf/src/liblutf_connect.c
lustre/tests/lutf/src/lutf.c
lustre/tests/lutf/src/lutf.h
lustre/tests/lutf/src/lutf_agent.h
lustre/tests/lutf/src/lutf_listener.c
lustre/tests/lutf/src/lutf_listener.h
lustre/tests/lutf/src/lutf_python.c

index 970c6c6..ea35271 100644 (file)
 #include "lutf_agent.h"
 #include "lutf.h"
 #include "lutf_python.h"
+#include "lutf_listener.h"
 
 static pthread_mutex_t agent_array_mutex;
 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
+static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS];
 /* TODO: this is probably not thread safe */
 static char agent_state_str[128];
 
-extern bool g_agent_enable_hb;
-extern struct in_addr g_local_ip;
+static bool g_agent_enable_hb = true;
+static struct in_addr g_local_ip;
 
 #define DEFAULT_RPC_RSP "rpc:\n   src: %s\n   dst: %s\n   type: failure\n"
 
@@ -33,15 +35,111 @@ char *get_local_ip()
        return inet_ntoa(g_local_ip);
 }
 
-void release_agent_blk(lutf_agent_blk_t *agent)
+static void insert_dead_agent_locked(lutf_agent_blk_t *agent)
 {
-       /* release the agent blk mutex */
-       MUTEX_LOCK(&agent->mutex);
-       if (agent) {
-               assert(agent->ref_count != 0);
-               agent->ref_count--;
+       int i = 0;
+
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               if (agent_dead_list[i] == NULL) {
+                       agent->state |= LUTF_AGENT_STATE_DEAD;
+                       agent_dead_list[i] = agent;
+                       agent->id = i;
+                       break;
+               }
+       }
+       assert(i < MAX_NUM_AGENTS);
+}
+
+static void del_dead_agent_locked(lutf_agent_blk_t *agent)
+{
+       assert(agent &&
+              agent->state & LUTF_AGENT_STATE_DEAD &&
+              agent_dead_list[agent->id] != NULL &&
+              agent_dead_list[agent->id] == agent);
+
+       assert(agent->ref_count > 0);
+       agent->ref_count--;
+
+       if (agent->ref_count == 0) {
+               agent_dead_list[agent->id] = NULL;
+               memset(agent, 0xdeadbeef, sizeof(*agent));
+               free(agent);
        }
+}
+
+void release_dead_list_agents(void)
+{
+       int i;
+
+       MUTEX_LOCK(&agent_array_mutex);
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               lutf_agent_blk_t *agent;
+
+               agent = agent_dead_list[i];
+
+               if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) {
+                       agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN;
+                       del_dead_agent_locked(agent);
+               }
+       }
+       MUTEX_UNLOCK(&agent_array_mutex);
+}
+
+static inline bool agent_alive(lutf_agent_blk_t *agent)
+{
+       bool viable = false;
+
+       MUTEX_LOCK(&agent->mutex);
+       if (agent->state & LUTF_AGENT_STATE_ALIVE)
+               viable = true;
        MUTEX_UNLOCK(&agent->mutex);
+
+       return viable;
+}
+
+void release_agent_blk(lutf_agent_blk_t *agent, int dead)
+{
+       assert(agent);
+
+       MUTEX_LOCK(&agent_array_mutex);
+       MUTEX_LOCK(&agent->mutex);
+
+       if (agent->state & LUTF_AGENT_STATE_ALIVE) {
+               /* sanity check */
+               assert(agent_live_list[agent->id] != NULL &&
+                      agent_live_list[agent->id] == agent);
+       } else {
+               MUTEX_UNLOCK(&agent->mutex);
+               del_dead_agent_locked(agent);
+               MUTEX_UNLOCK(&agent_array_mutex);
+               return;
+       }
+
+       assert(agent->ref_count > 0);
+       agent->ref_count--;
+
+       if (agent->ref_count == 0) {
+               agent_live_list[agent->id] = NULL;
+               assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS));
+               MUTEX_UNLOCK(&agent->mutex);
+               MUTEX_UNLOCK(&agent_array_mutex);
+               close_agent_connection(agent);
+               memset(agent, 0xdeadbeef, sizeof(*agent));
+               /* free the block */
+               free(agent);
+       } else if (dead) {
+               agent_live_list[agent->id] = NULL;
+               insert_dead_agent_locked(agent);
+               MUTEX_UNLOCK(&agent->mutex);
+               MUTEX_UNLOCK(&agent_array_mutex);
+               unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
+               unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED);
+               unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED);
+               close_agent_connection(agent);
+       } else {
+               MUTEX_UNLOCK(&agent->mutex);
+               MUTEX_UNLOCK(&agent_array_mutex);
+       }
 }
 
 void acquire_agent_blk(lutf_agent_blk_t *agent)
@@ -67,8 +165,7 @@ char *agent_state2str(lutf_agent_blk_t *agent)
        return agent_state_str;
 }
 
-static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
-                                               struct sockaddr_in *addr)
+static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr)
 {
        int i;
        lutf_agent_blk_t *agent;
@@ -78,10 +175,11 @@ static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
 
        MUTEX_LOCK(&agent_array_mutex);
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
-               agent = list[i];
-               if ((agent) &&
+               agent = agent_live_list[i];
+               if ((agent) && agent_alive(agent) &&
                    (agent->addr.sin_addr.s_addr ==
                     addr->sin_addr.s_addr)) {
+                       acquire_agent_blk(agent);
                        MUTEX_UNLOCK(&agent_array_mutex);
                        return agent;
                }
@@ -102,7 +200,7 @@ int get_next_active_agent(int idx, lutf_agent_blk_t **out)
        MUTEX_LOCK(&agent_array_mutex);
        for (i = idx; i < MAX_NUM_AGENTS; i++) {
                agent = agent_live_list[i];
-               if (agent) {
+               if (agent && agent_alive(agent)) {
                        i++;
                        acquire_agent_blk(agent);
                        break;
@@ -119,13 +217,11 @@ out:
 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
 {
        lutf_agent_blk_t *agent;
-       agent = find_agent_blk_by_addr(agent_live_list, addr);
+
+       agent = find_agent_blk_by_addr(addr);
        if (!agent)
                return find_free_agent_blk(addr);
-
-       MUTEX_LOCK(&agent_array_mutex);
-       acquire_agent_blk(agent);
-       MUTEX_UNLOCK(&agent_array_mutex);
+       release_agent_blk(agent, false);
 
        return agent;
 }
@@ -161,6 +257,11 @@ void agent_enable_hb(void)
        g_agent_enable_hb = true;
 }
 
+int agent_get_hb(void)
+{
+       return g_agent_enable_hb;
+}
+
 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
 {
        int i = 0;
@@ -170,8 +271,10 @@ lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
        MUTEX_LOCK(&agent_array_mutex);
 
        /* iterate through the array to find a free entry */
-       while (agent_live_list[i] != NULL)
-               i++;
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               if (agent_live_list[i] == NULL)
+                       break;
+       }
 
        if (i >= MAX_NUM_AGENTS) {
                MUTEX_UNLOCK(&agent_array_mutex);
@@ -226,7 +329,10 @@ lutf_agent_blk_t *find_agent_blk_by_id(int idx)
 
        agent = agent_live_list[idx];
 
-       acquire_agent_blk(agent);
+       if (agent_alive(agent))
+               acquire_agent_blk(agent);
+       else
+               agent = NULL;
 
        /* release the array mutex */
        MUTEX_UNLOCK(&agent_array_mutex);
@@ -244,53 +350,9 @@ void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
 
 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
 {
-       bool zombie = false;
-
        MUTEX_LOCK(&agent->mutex);
        agent->state &= ~state;
-       if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) &&
-           (agent->state & LUTF_AGENT_ZOMBIE))
-               zombie = true;
-       MUTEX_UNLOCK(&agent->mutex);
-
-       if (zombie)
-               free_agent_blk(agent->id);
-}
-
-void free_agent_blk(int id)
-{
-       lutf_agent_blk_t *agent;
-
-       /* grab the array mutex */
-       MUTEX_LOCK(&agent_array_mutex);
-
-       /* if the blk is non null grab the mutex.
-        * possibly block until previous user is done
-        */
-       if (agent_live_list[id] == NULL) {
-               MUTEX_UNLOCK(&agent_array_mutex);
-               return;
-       }
-
-       agent = agent_live_list[id];
-
-       MUTEX_LOCK(&agent->mutex);
-       if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) {
-               MUTEX_UNLOCK(&agent->mutex);
-               MUTEX_UNLOCK(&agent_array_mutex);
-               PDEBUG("delay deleting agent %s\n", agent->name);
-               set_agent_state(agent, LUTF_AGENT_ZOMBIE);
-               return;
-       }
        MUTEX_UNLOCK(&agent->mutex);
-
-       agent_live_list[id] = NULL;
-
-       /* release the array mutex */
-       MUTEX_UNLOCK(&agent_array_mutex);
-
-       /* free the block */
-       free(agent);
 }
 
 char *agent_ip2str(lutf_agent_blk_t *agent)
@@ -305,6 +367,7 @@ int get_num_agents(void)
 {
        int i;
        int num = 0;
+
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
                if (agent_live_list[i] != NULL)
                        num++;
@@ -313,8 +376,7 @@ int get_num_agents(void)
        return num;
 }
 
-/* no lock version of the function */
-static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
+lutf_agent_blk_t *find_agent_blk_by_name(char *name)
 {
        lutf_agent_blk_t *agent;
        int i;
@@ -326,10 +388,13 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
 
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
                agent = agent_live_list[i];
-               if ((agent) &&
+               if ((agent) && agent_alive(agent) &&
                    ((strcmp(agent->name, name) == 0) ||
                     (strcmp(name, TEST_ROLE_GRC) == 0))) {
+                       acquire_agent_blk(agent);
                        break;
+               } else {
+                       agent = NULL;
                }
        }
 
@@ -339,18 +404,6 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
        return agent;
 }
 
-lutf_agent_blk_t *find_agent_blk_by_name(char *name)
-{
-       lutf_agent_blk_t *agent;
-
-       agent = find_agent_blk_by_name_nl(name);
-       if (agent)
-               acquire_agent_blk(agent);
-
-       /* return the agent blk */
-       return agent;
-}
-
 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
 {
        lutf_agent_blk_t *agent;
@@ -367,9 +420,12 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
 
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
                agent = agent_live_list[i];
-               if ((agent) && (agent->addr.sin_addr.s_addr ==
+               if ((agent) && agent_alive(agent) &&
+                   (agent->addr.sin_addr.s_addr ==
                                addr.sin_addr.s_addr))
                        break;
+               else
+                       agent = NULL;
        }
 
        if (agent)
@@ -382,39 +438,6 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
        return agent;
 }
 
-void agent_hb_check(struct timeval *t, lutf_type_t me)
-{
-       lutf_agent_blk_t *agent;
-       int i;
-
-       /* grab the array mutex */
-       MUTEX_LOCK(&agent_array_mutex);
-
-       for (i = 0; i < MAX_NUM_AGENTS; i++) {
-               agent = agent_live_list[i];
-               if (agent && agent->node_type != me) {
-                       acquire_agent_blk(agent);
-                       if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
-                               int agent_id = agent->id;
-                               /* agent didn't send a HB move to dead
-                                * list
-                                */
-                               PERROR("agent %s presumed dead", agent->name);
-                               release_agent_blk(agent);
-                               MUTEX_UNLOCK(&agent_array_mutex);
-                               /* free_agent_blk() grabs the mutex */
-                               free_agent_blk(agent_id);
-                               MUTEX_LOCK(&agent_array_mutex);
-                               continue;
-                       }
-                       release_agent_blk(agent);
-               }
-       }
-
-       /* release the array mutex */
-       MUTEX_UNLOCK(&agent_array_mutex);
-}
-
 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
 {
        struct timeval start;
@@ -438,12 +461,13 @@ lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
                PDEBUG("Waiting for Agents");
                while (cYAML_get_next_seq_item(agents, &a) != NULL) {
                        PDEBUG("Looking up: %s", a->cy_valuestring);
-                       if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) {
+                       agent = find_agent_blk_by_name(a->cy_valuestring);
+                       if (agent) {
+                               PDEBUG("agent %s found\n", agent->name);
+                               release_agent_blk(agent, false);
+                       } else {
                                found = false;
                                break;
-                       } else {
-                               PDEBUG("agent %s found\n", agent->name);
-                               release_agent_blk(agent);
                        }
                }
                if (!found)
@@ -484,7 +508,8 @@ int get_num_agents_remote(char *masterIP, int masterPort)
                                                htons(masterPort),
                                                false, false);
        if (remoteSocket < 0) {
-               PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket));
+               PERROR("establishTCPConnection failure: %s",
+                      lutf_rc2str(remoteSocket));
                rc = remoteSocket;
                goto out;
        }
@@ -547,7 +572,9 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
                       agent_blk->name,
                       inet_ntoa(agent_blk->addr.sin_addr),
                       agent_blk->listen_port);
-               /* in network byte order, convert so we can have a uniform API */
+               /* in network byte order, convert so we can have a
+                * uniform API
+                */
                agent_blk->iRpcFd = establishTCPConnection(
                                agent_blk->addr.sin_addr.s_addr,
                                htons(agent_blk->listen_port),
@@ -591,7 +618,8 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
                goto fail_rpc;
        }
 
-       rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout);
+       rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len),
+                           timeout);
        if (rc != EN_LUTF_RC_OK) {
                PERROR("Failed to recv rpc body in timeout %d", timeout);
                goto fail_rpc;
@@ -604,15 +632,15 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
         * appropriately.
         */
        *rsp = recvBuf;
-       release_agent_blk(agent_blk);
-
        unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+       release_agent_blk(agent_blk, false);
 
        return EN_LUTF_RC_OK;
 
 fail_rpc:
-       release_agent_blk(agent_blk);
        unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+       set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
+       release_agent_blk(agent_blk, true);
        if (recvBuf)
                free(recvBuf);
        msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
@@ -640,13 +668,14 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
        lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
        lutf_agent_blk_t *agent_blk;
        int msg_size;
+       bool dead = false;
 
        if (!agent || !yaml)
                goto out;
 
        msg_size = strlen(yaml) + 1;
 
-       agent_blk = find_agent_blk_by_name_nl(agent);
+       agent_blk = find_agent_blk_by_name(agent);
        if (!agent_blk) {
                PERROR("Can't find agent with name: %s", agent);
                goto out;
@@ -657,14 +686,21 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
                MUTEX_UNLOCK(&agent_blk->mutex);
                PERROR("agent_blk %s doesn't have an RPC channel",
                       agent_blk->name);
-               goto out;
+               goto release_agent;
        }
        MUTEX_UNLOCK(&agent_blk->mutex);
 
+       set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
        PDEBUG("sending rpc response\n%s", yaml);
        rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
                           EN_MSG_TYPE_RPC_RESPONSE);
-
+       if (rc)
+               dead = true;
+release_agent:
+       unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+       release_agent_blk(agent_blk, dead);
+       if (dead)
+               set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
 out:
        return rc;
 }
index 796571a..a14cbbd 100644 (file)
@@ -24,7 +24,8 @@ static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA,
        fd_set rset, wset;
        struct timeval tval;
 
-       if ((iN = connect(iSockFd, (struct sockaddr *)psSA, iSAlen)) < 0) {
+       iN = connect(iSockFd, (struct sockaddr *)psSA, iSAlen);
+       if (iN < 0) {
                if (errno != EINPROGRESS) {
                        PERROR("Connect Failed: %s:%d", strerror(errno), errno);
                        return EN_LUTF_RC_FAIL;
@@ -38,8 +39,9 @@ static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA,
                tval.tv_sec = iNsec;
                tval.tv_usec = 0;
 
-               if ((iN = select(iSockFd+1, &rset, &wset, NULL,
-                                iNsec ? &tval : NULL)) == 0) {
+               iN = select(iSockFd+1, &rset, &wset, NULL,
+                           iNsec ? &tval : NULL);
+               if (iN == 0) {
                        errno = ETIMEDOUT;
                        PERROR("Select timed out");
                        return EN_LUTF_RC_FAIL;
@@ -50,8 +52,10 @@ static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA,
 
                if (FD_ISSET(iSockFd, &rset) || FD_ISSET(iSockFd, &wset)) {
                        iLen = sizeof(iError);
-                       if (getsockopt(iSockFd, SOL_SOCKET, SO_ERROR, &iError, (socklen_t *)&iLen) < 0) {
-                               PERROR("getsockopt failed indicating connect failure, errno= %d", errno);
+                       if (getsockopt(iSockFd, SOL_SOCKET, SO_ERROR, &iError,
+                                      (socklen_t *)&iLen) < 0) {
+                               PERROR("getsockopt failed indicating connect failure, errno= %d",
+                                      errno);
                                return EN_LUTF_RC_FAIL;
                        }
                } else {
@@ -81,8 +85,8 @@ int establishTCPConnection(unsigned long uiAddress,
        lutf_rc_t eRc = EN_LUTF_RC_OK;
 
        /* Create TCP socket */
-       if ((rsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP))
-            == -1)
+       rsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+       if (rsocket == -1)
                return EN_LUTF_RC_FAIL;
 
        /* Turn off Nagle's algorithm for this TCP socket. */
@@ -109,11 +113,9 @@ int establishTCPConnection(unsigned long uiAddress,
        tm_addr.sin_port = (endian) ? htons(iPort) : iPort;
        tm_addr.sin_family = AF_INET;
 
-       if ((eRc = doNonBlockingConnect(rsocket,
-                                       (struct sockaddr *)&tm_addr,
-                                       sizeof(tm_addr),
-                                       SOCKET_CONN_TIMEOUT_SEC))
-           != EN_LUTF_RC_OK) {
+       eRc = doNonBlockingConnect(rsocket, (struct sockaddr *)&tm_addr,
+                                  sizeof(tm_addr), SOCKET_CONN_TIMEOUT_SEC);
+       if (eRc != EN_LUTF_RC_OK) {
                close(rsocket);
                return eRc;
        }
@@ -127,7 +129,7 @@ lutf_rc_t closeTcpConnection(int iTcpSocket)
 
        PDEBUG("closing socket %d", iTcpSocket);
        rc = close(iTcpSocket);
-       if (!rc && errno != EINPROGRESS && errno != ECONNRESET) {
+       if (rc && errno != EINPROGRESS && errno != ECONNRESET) {
                PERROR("failed to close %d:%d\n", iTcpSocket, errno);
                return EN_LUTF_RC_FAIL;
        }
@@ -275,7 +277,9 @@ lutf_rc_t readTcpMessage(int iFd, char *pcBuffer,
 
                if (tNread < 0) {
                        if (errno == EINTR) {
-                               /*  We were interrupted, but this is not an error condition.  */
+                               /* We were interrupted, but this is not an
+                                * error condition.
+                                */
                                tNread = 0;
                        } else if ((errno == EAGAIN) && (!iTimeout)) {
                                return EN_LUTF_RC_SOCKET_FAIL;
index e174fe9..4ed1bf0 100644 (file)
 
 #define HB_TIMEOUT     2
 
-struct in_addr g_local_ip;
 FILE *out;
 char *outlog;
 
-/*externs needed by getopt lib*/
-extern char *optarg;
-extern int optind;
-
 static void
-lutf_help_usage(const struct option *long_options, const char *description[])
+lutf_help_usage(const struct option *long_options, const char *const description[])
 {
        int i = 0;
 
@@ -87,7 +82,8 @@ lutf_rc_t hostname_to_ip(char *hostname, char *ip, int len)
        hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
        hints.ai_socktype = SOCK_STREAM;
 
-       if ((rv = getaddrinfo(hostname, "http", &hints, &servinfo)) != 0) {
+       rv = getaddrinfo(hostname, "http", &hints, &servinfo);
+       if (rv != 0) {
                PERROR("getaddrinfo: %s\n", gai_strerror(rv));
                return EN_LUTF_RC_BAD_ADDR;
        }
@@ -182,15 +178,14 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree,
                                /* maybe it's a host name so let's try
                                 * that out
                                 */
-                               rc = EN_LUTF_RC_BAD_ADDR;
-                               if ((rc = hostname_to_ip(tmp->cy_valuestring, maddr,
-                                                        sizeof(maddr)))
-                                   != EN_LUTF_RC_OK) {
+                               rc = hostname_to_ip(tmp->cy_valuestring, maddr,
+                                                   sizeof(maddr));
+                               if (rc != EN_LUTF_RC_OK) {
                                        *elem = "master-address";
                                        return rc;
                                } else if (!inet_aton(maddr, &addr)) {
                                        *elem = "master-address";
-                                       return rc;
+                                       return EN_LUTF_RC_BAD_ADDR;
                                }
                        }
                        cfg->l_info.hb_info.master_address.sin_addr = addr;
@@ -244,8 +239,8 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree,
        tmp = get_value(head, "node-name");
        if (tmp) {
                if (tmp->cy_type == CYAML_TYPE_STRING) {
-                       strncpy(cfg->l_info.hb_info.node_name, tmp->cy_valuestring,
-                               MAX_STR_LEN);
+                       strncpy(cfg->l_info.hb_info.node_name,
+                               tmp->cy_valuestring, MAX_STR_LEN);
                        cfg->l_info.hb_info.node_name[MAX_STR_LEN - 1] = '\0';
                } else {
                        *elem = "node-name";
@@ -268,6 +263,19 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree,
                return EN_LUTF_RC_MISSING_PARAM;
        }
 
+       tmp = get_value(head, "suite-list");
+       if (tmp && cfg->l_info.type == EN_LUTF_MASTER) {
+               if (tmp->cy_type == CYAML_TYPE_STRING)
+                       if (strlen(tmp->cy_valuestring) > 0)
+                               cfg->suite_list = tmp->cy_valuestring;
+                       else
+                               cfg->suite_list = NULL;
+               else {
+                       *elem = "suite-list";
+                       return EN_LUTF_RC_BAD_PARAM;
+               }
+       }
+
        tmp = get_value(head, "suite");
        if (tmp && cfg->l_info.type == EN_LUTF_MASTER) {
                if (tmp->cy_type == CYAML_TYPE_STRING)
@@ -323,7 +331,7 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree,
                        return EN_LUTF_RC_BAD_PARAM;
                }
        } else {
-               cfg->results_file = "lutf_def_results";
+               cfg->results_file = "/tmp/lutf/lutf_def_results";
        }
 
        tmp = get_value(head, "agent-list");
@@ -377,7 +385,7 @@ main(int argc, char *argv[])
                {NULL, 0, NULL, 0}
        };
 
-       const char *description[] = {
+       static const char * const description[] = {
                /*'c'*/":\n\t\tYAML config file",
                /*'h'*/":\n\t\tPrint this help",
                NULL
index 79bd62d..65546cb 100644 (file)
@@ -74,6 +74,8 @@ typedef struct lutf_config_params_s {
        char *py_path; /* other python specific paths */
        char *master_name; /* name of master. Important if I'm an agent */
        char *suite; /* name of suite to run. Run all if not present */
+       char *suite_list; /* list of suites to run. Takes precedence
+                            over single suite parameter */
        char *script; /* name of script to run. Suite must be specified */
        char *pattern; /* file match pattern */
        char *results_file; /* path to results file */
@@ -85,9 +87,9 @@ typedef struct lutf_config_params_s {
 
 lutf_config_params_t g_lutf_cfg;
 
-static inline char *lutf_rc2str(lutf_rc_t rc)
+static inline const char *lutf_rc2str(lutf_rc_t rc)
 {
-       char *str[] = {
+       static const char * const str[] = {
                [EN_LUTF_RC_OK] = "RC_OK",
                [EN_LUTF_RC_FAIL*-1] = "RC_FAIL",
                [EN_LUTF_RC_SYS_ERR*-1] = "RC_SYSTEM_ERROR",
index f509004..7e769b9 100644 (file)
@@ -12,7 +12,8 @@ struct cYAML;
 #define LUTF_AGENT_HB_CHANNEL_CONNECTED (1 << 1)
 #define LUTF_AGENT_RPC_CHANNEL_CONNECTED (1 << 2)
 #define LUTF_AGENT_WORK_IN_PROGRESS (1 << 3)
-#define LUTF_AGENT_ZOMBIE (1 << 4)
+#define LUTF_AGENT_NEED_LISTEN_CLEAN (1 << 4)
+#define LUTF_AGENT_STATE_DEAD (1 << 5)
 
 typedef struct lutf_agent_blk_s {
        pthread_mutex_t mutex;
@@ -88,12 +89,6 @@ lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr);
 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr);
 
 /*
- * free_agent_blk
- *     Free an agent blk that no longer is needed
- */
-void free_agent_blk(int id);
-
-/*
  * acquire_agent_blk
  *     acquire the agent for work
  */
@@ -103,7 +98,7 @@ void acquire_agent_blk(lutf_agent_blk_t *agent);
  * release_agent_blk
  *     Release the agent blk
  */
-void release_agent_blk(lutf_agent_blk_t *agent);
+void release_agent_blk(lutf_agent_blk_t *agent, int dead);
 
 /*
  * agent_ip2str
@@ -112,13 +107,6 @@ void release_agent_blk(lutf_agent_blk_t *agent);
 char *agent_ip2str(lutf_agent_blk_t *agent);
 
 /*
- * agent_hb_check
- *     Given a time struct insure that the agent doesn't exceed the HB
- *     time.
- */
-void agent_hb_check(struct timeval *t, lutf_type_t whoami);
-
-/*
  * agent_disable_hb
  *     Disables the HB
  */
@@ -131,6 +119,12 @@ void agent_disable_hb(void);
 void agent_enable_hb(void);
 
 /*
+ * agent_get_hb
+ *     Get current HB state
+ */
+int agent_get_hb(void);
+
+/*
  * get the number of registered agents
  */
 int get_num_agents(void);
index 51690b6..781e52e 100644 (file)
@@ -18,9 +18,9 @@
 #include "lutf_listener.h"
 
 static fd_set g_tAllSet;
-static bool g_bShutdown = false;
+static bool g_bShutdown;
 static int g_iListenFd = INVALID_TCP_SOCKET;
-bool g_agent_enable_hb = true;
+static int g_iMaxSelectFd = INVALID_TCP_SOCKET;
 
 typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
 
@@ -181,14 +181,16 @@ static lutf_rc_t init_comm(unsigned short server_port)
        sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        sServAddr.sin_port = htons(server_port);
 
-       if (bind(g_iListenFd, (struct sockaddr *) &sServAddr, sizeof(sServAddr)) < 0) {
+       if (bind(g_iListenFd, (struct sockaddr *) &sServAddr,
+                sizeof(sServAddr)) < 0) {
                /*  Cannot bind our listening socket.  */
                closeTcpConnection(g_iListenFd);
                return EN_LUTF_RC_BIND_FAILED;
        }
 
-       /*  Let the system know we wish to listen to this port for
-        *  connections. */
+       /* Let the system know we wish to listen to this port for
+        * connections.
+        */
        if (listen(g_iListenFd, 2) < 0) {
                /*  Cannot listen to socket, close and fail  */
                closeTcpConnection(g_iListenFd);
@@ -209,19 +211,6 @@ static lutf_rc_t init_comm(unsigned short server_port)
        return EN_LUTF_RC_OK;
 }
 
-static inline int close_agent_connection(lutf_agent_blk_t *agent)
-{
-       FD_CLR(agent->iFileDesc, &g_tAllSet);
-       FD_CLR(agent->iRpcFd, &g_tAllSet);
-       closeTcpConnection(agent->iRpcFd);
-       closeTcpConnection(agent->iFileDesc);
-       agent->state &=
-               ~LUTF_AGENT_RPC_CHANNEL_CONNECTED;
-       agent->state &=
-               ~LUTF_AGENT_HB_CHANNEL_CONNECTED;
-       return get_highest_fd();
-}
-
 lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
                  int type)
 {
@@ -250,8 +239,7 @@ lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
        /* we assume the first connection is an HB connection */
        if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
                if (agent->iFileDesc != INVALID_TCP_SOCKET) {
-                       PERROR("agent in unexpected state. "
-                              "state is %s, but HB FD is %d",
+                       PERROR("agent in unexpected state. state is %s, but HB FD is %d",
                               agent_state2str(agent), fd);
                        return EN_LUTF_RC_SYS_ERR;
                } else {
@@ -262,8 +250,7 @@ lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
                }
        } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
                if (agent->iRpcFd != INVALID_TCP_SOCKET) {
-                       PERROR("agent in unexpected state. "
-                              "state is %s, but RPC FD is %d",
+                       PERROR("agent in unexpected state. state is %s, but RPC FD is %d",
                               agent_state2str(agent), fd);
                        return EN_LUTF_RC_SYS_ERR;
                } else {
@@ -279,6 +266,44 @@ lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
        return EN_LUTF_RC_SYS_ERR;
 }
 
+void close_agent_connection(lutf_agent_blk_t *agent)
+{
+       if (agent->iFileDesc != INVALID_TCP_SOCKET) {
+               FD_CLR(agent->iFileDesc, &g_tAllSet);
+               closeTcpConnection(agent->iFileDesc);
+               agent->iFileDesc = -1;
+       }
+       if (agent->iRpcFd != INVALID_TCP_SOCKET) {
+               FD_CLR(agent->iRpcFd, &g_tAllSet);
+               closeTcpConnection(agent->iRpcFd);
+               agent->iRpcFd = -1;
+       }
+       g_iMaxSelectFd = get_highest_fd();
+}
+
+void agent_hb_check(struct timeval *t, lutf_type_t me)
+{
+       lutf_agent_blk_t *agent;
+       int i = 0;
+
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               agent = find_agent_blk_by_id(i);
+
+               if (agent && agent->node_type != me) {
+                       if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
+                               /* agent didn't send a HB move to dead
+                                * list
+                                */
+                               PERROR("agent %s presumed dead", agent->name);
+                               release_agent_blk(agent, true);
+                               continue;
+                       }
+               }
+               if (agent)
+                       release_agent_blk(agent, false);
+       }
+}
+
 /*
  * lutf_listener_main
  *   main loop.  Listens for incoming agent connections, and for agent
@@ -300,7 +325,6 @@ void *lutf_listener_main(void *usr_data)
        socklen_t  tCliLen;
        fd_set tReadSet;
        int iNReady;
-       int iMaxSelectFd;
        int i;
        lutf_rc_t rc;
        lutf_agent_blk_t *agent = NULL, *master = NULL;
@@ -323,287 +347,286 @@ void *lutf_listener_main(void *usr_data)
 
        agent_init();
 
-       iMaxSelectFd = g_iListenFd;
-
-       gettimeofday(&time_1, NULL);
+       g_iMaxSelectFd = g_iListenFd;
 
+       gettimeofday(&time_2, NULL);
 
-       /*  Main Processing Loop: Keep going until we have reason to shutdown. */
+       /*  Main Processing Loop: Keep going until we have reason
+        * to shutdown.
+        */
        while (!g_bShutdown) {
                /*  Wait on our select mask for an event to occur.  */
-               tReadSet = g_tAllSet;
-
                select_to.tv_sec = HB_TO;
                select_to.tv_usec = 0;
 
-               iNReady = select(iMaxSelectFd + 1, &tReadSet, NULL, NULL, &select_to);
+               FD_ZERO(&tReadSet);
+               tReadSet = g_tAllSet;
+               iNReady = select(g_iMaxSelectFd + 1, &tReadSet, NULL, NULL,
+                                &select_to);
+
+               release_dead_list_agents();
 
                /*  Determine if we failed the select call */
                if (iNReady < 0) {
                        /*  Check to see if we were interrupted by a signal.  */
                        if ((errno == EINTR) || (errno == EAGAIN)) {
                                PERROR("Select failure: errno = %d", errno);
-                       } else {
-                               /*  If this is an ECONNABORTED error, just ignore it.  */
-                               if (errno != ECONNABORTED) {
-                                       /* Raise a fatal alarm.  */
-                                       /* Shut down */
-                                       PERROR("Shutting down Listener thread. errno: %d", errno);
-                                       g_bShutdown = true;
-                               }
+                       } else if (errno != ECONNABORTED) {
+                               /* If this is an ECONNABORTED error, just
+                                * ignore it. Raise a fatal alarm and shut
+                                * down.
+                                */
+                               PERROR("Shutting down Listener thread. errno: %d",
+                                      errno);
+                               lutf_listener_shutdown();
                        }
-               } else {
-                       if (FD_ISSET(g_iListenFd, &tReadSet)) {
-                               /*  A new client is trying to connect.  */
-                               tCliLen = sizeof(sCliAddr);
-                               iConnFd = accept(g_iListenFd, (struct sockaddr *) &sCliAddr,
-                                                &tCliLen);
-                               if (iConnFd < 0) {
-                                       /*  Cannot accept new connection...just ignore.  */
-                                       if (errno != EWOULDBLOCK)
-                                               PERROR("Error on accept(), errno = %d", errno);
-                               } else {
-                                       /* Try to see if we have an agent
-                                        * with the same address, since
-                                        * agents can have multiple tcp
-                                        * connections open
-                                        */
-                                       agent = find_create_agent_blk_by_addr(&sCliAddr);
-                                       if (!agent) {
-                                               /*  Cannot support more clients...just ignore.  */
-                                               PERROR("Cannot accept more clients");
-                                               closeTcpConnection(iConnFd);
-                                       } else {
-                                               int iOption, iFlags;
-
-                                               rc = complete_agent_connection(agent,
-                                                               iConnFd);
-                                               if (rc != EN_LUTF_RC_OK) {
-                                                       int agent_id = agent->id;
-                                                       iMaxSelectFd = close_agent_connection(agent);
-                                                       release_agent_blk(agent);
-                                                       free_agent_blk(agent_id);
-                                                       continue;
-                                               }
 
-                                               /* all nodes listen on the
-                                                * same port
-                                                */
-                                               agent->listen_port = info->listen_port;
-
-                                               /*  Add new client to our select mask.  */
-                                               FD_SET(iConnFd, &g_tAllSet);
-                                               iMaxSelectFd = get_highest_fd();
-
-                                               /* Ok, it seems that the connected socket gains
-                                                * the same flags as the listen socket.  We want
-                                                * to make it blocking here.
-                                                */
-                                               iFlags = fcntl(iConnFd, F_GETFL, 0);
-                                               fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
-
-                                               /*  And, we want to turn off Nagle's algorithm to
-                                                *  reduce latency
-                                                */
-                                               iOption = 1;
-                                               setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
-                                                          (void *)&iOption,
-                                                          sizeof(iOption));
-
-                                               PDEBUG("Received a connection from %s on FD %d\n",
-                                                      inet_ntoa(agent->addr.sin_addr), iConnFd);
-
-                                               release_agent_blk(agent);
-                                       }
-                               }
+                       /* store the current time */
+                       time_1 = time_2;
 
-                               /*  See if there are other messages waiting.  */
-                               iNReady--;
-                       }
+                       /* Zero out the g_tAllSet */
+                       FD_ZERO(&g_tAllSet);
 
-                       /* need to iterate through the clients and see if a
-                        * message was sent to any of them
-                        */
-                       for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
-                               /* reset the return code to avoid misbehaving on previous
-                                * returns
+                       continue;
+               }
+
+               if (FD_ISSET(g_iListenFd, &tReadSet)) {
+                       /*  A new client is trying to connect.  */
+                       tCliLen = sizeof(sCliAddr);
+                       iConnFd = accept(g_iListenFd,
+                                        (struct sockaddr *) &sCliAddr,
+                                        &tCliLen);
+                       if (iConnFd < 0) {
+                               /*  Cannot accept new connection...
+                                * just ignore.
                                 */
-                               rc = EN_LUTF_RC_OK;
+                               if (errno != EWOULDBLOCK)
+                                       PERROR("Error on accept(), errno = %d",
+                                              errno);
+                       } else {
+                               /* Try to see if we have an agent
+                                * with the same address, since
+                                * agents can have multiple tcp
+                                * connections open
+                                */
+                               agent = find_create_agent_blk_by_addr(&sCliAddr);
+                               if (!agent) {
+                                       /*  Cannot support more clients...just ignore.  */
+                                       PERROR("Cannot accept more clients");
+                                       closeTcpConnection(iConnFd);
+                               } else {
+                                       int iOption, iFlags;
 
-                               if ((agent = find_agent_blk_by_id(i))) {
-                                       int hb_fd = INVALID_TCP_SOCKET;
-                                       int rpc_fd = INVALID_TCP_SOCKET;
+                                       rc = complete_agent_connection(agent,
+                                                                      iConnFd);
+                                       if (rc != EN_LUTF_RC_OK) {
+                                               release_agent_blk(agent, true);
+                                               continue;
+                                       }
 
-                                       if (FD_ISSET(agent->iFileDesc, &tReadSet))
-                                               hb_fd = agent->iFileDesc;
-                                       if (FD_ISSET(agent->iRpcFd, &tReadSet))
-                                               rpc_fd = agent->iRpcFd;
+                                       /* all nodes listen on the
+                                        * same port
+                                        */
+                                       agent->listen_port = info->listen_port;
 
-                                       if (hb_fd == INVALID_TCP_SOCKET &&
-                                           rpc_fd == INVALID_TCP_SOCKET)
-                                               continue;
+                                       /*  Add new client to our select mask.  */
+                                       FD_SET(iConnFd, &g_tAllSet);
+                                       g_iMaxSelectFd = get_highest_fd();
 
-                                       /* process heart beat */
-                                       if (hb_fd != INVALID_TCP_SOCKET) {
-                                               /* process the message */
-                                               rc = process_agent_message(agent, hb_fd);
-                                               if (rc)
-                                                       PERROR("msg failure: %s",
-                                                              lutf_rc2str(rc));
-                                       }
-                                       if (rc == EN_LUTF_RC_SOCKET_FAIL) {
-                                               int agent_id = agent->id;
-                                               if (agent->id == master->id) {
-                                                       PERROR("Disconnected from master. Will attempt to reconnect");
-                                                       master_connected = false;
-                                               }
-                                               iMaxSelectFd = close_agent_connection(agent);
-                                               release_agent_blk(agent);
-                                               free_agent_blk(agent_id);
-                                               continue;
-                                       }
+                                       /* Ok, it seems that the connected socket gains
+                                        * the same flags as the listen socket.  We want
+                                        * to make it blocking here.
+                                        */
+                                       iFlags = fcntl(iConnFd, F_GETFL, 0);
+                                       fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
 
-                                       /* process rpc */
-                                       if (rpc_fd != INVALID_TCP_SOCKET) {
-                                               /* process the message */
-                                               rc = process_agent_message(agent, rpc_fd);
-                                               if (rc)
-                                                       PERROR("msg failure: %s",
-                                                              lutf_rc2str(rc));
-                                       }
-                                       if (rc == EN_LUTF_RC_SOCKET_FAIL) {
-                                               int agent_id = agent->id;
-                                               if (agent->id == master->id) {
-                                                       PERROR("Disconnected from master. Will attempt to reconnect");
-                                                       master_connected = false;
-                                               }
-                                               iMaxSelectFd = close_agent_connection(agent);
-                                               release_agent_blk(agent);
-                                               free_agent_blk(agent_id);
-                                               continue;
-                                       }
-                                       release_agent_blk(agent);
+                                       /*  And, we want to turn off Nagle's algorithm to
+                                        *  reduce latency
+                                        */
+                                       iOption = 1;
+                                       setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
+                                                  (void *)&iOption,
+                                                  sizeof(iOption));
+
+                                       PDEBUG("Received a connection from %s on FD %d\n",
+                                              inet_ntoa(agent->addr.sin_addr), iConnFd);
                                }
                        }
 
-                       /* establish connection with the master if I'm an agent
-                        * and I have not connected to the master yet.
-                        * Otherwise send a heart beat
+                       /*  See if there are other messages waiting.  */
+                       iNReady--;
+               }
+
+               /* need to iterate through the clients and see if a
+                * message was sent to any of them
+                */
+               for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
+                       /* reset the return code to avoid misbehaving on previous
+                        * returns
                         */
-                       if (!master_connected &&
-                           strlen(g_lutf_cfg.master_name) != 0) {
-                               PDEBUG("Attempting a connection on master %s",
-                                      g_lutf_cfg.master_name);
-                               master = find_free_agent_blk(&info->hb_info.master_address);
-                               if (!master) {
-                                       PERROR("Failed to allocate agent block");
+                       rc = EN_LUTF_RC_OK;
+
+                       if ((agent = find_agent_blk_by_id(i))) {
+                               int hb_fd = INVALID_TCP_SOCKET;
+                               int rpc_fd = INVALID_TCP_SOCKET;
+
+                               release_agent_blk(agent, false);
+
+                               if (FD_ISSET(agent->iFileDesc, &tReadSet))
+                                       hb_fd = agent->iFileDesc;
+                               if (FD_ISSET(agent->iRpcFd, &tReadSet))
+                                       rpc_fd = agent->iRpcFd;
+
+                               if (hb_fd == INVALID_TCP_SOCKET &&
+                                   rpc_fd == INVALID_TCP_SOCKET)
                                        continue;
-                               }
 
-                               iConnFd = establishTCPConnection(
-                                       info->hb_info.master_address.sin_addr.s_addr,
-                                       htons(info->hb_info.master_address.sin_port),
-                                       true, false);
-
-                               if (iConnFd < 0) {
-                                       int master_id = master->id;
-
-                                       PERROR("establishTCPConnection failure: %s. Clearing set",
-                                               lutf_rc2str(iConnFd));
-                                       iMaxSelectFd = close_agent_connection(master);
-                                       release_agent_blk(master);
-                                       free_agent_blk(master_id);
-                                       PERROR("Disconnected from master. Will attempt to reconnect");
-                                       master_connected = false;
+                               /* process heart beat */
+                               if (hb_fd != INVALID_TCP_SOCKET) {
+                                       /* process the message */
+                                       rc = process_agent_message(agent, hb_fd);
+                                       if (rc)
+                                               PERROR("msg failure: %s",
+                                                      lutf_rc2str(rc));
+                               }
+                               if (rc == EN_LUTF_RC_SOCKET_FAIL) {
+                                       if (agent->id == master->id) {
+                                               PERROR("Disconnected from master. Will attempt to reconnect");
+                                               master_connected = false;
+                                       }
+                                       release_agent_blk(agent, true);
                                        continue;
                                }
 
-                               master->iFileDesc = iConnFd;
-                               memcpy(&master->addr,
-                                      &info->hb_info.master_address,
-                                      sizeof(master->addr));
-                               strncpy(master->name, g_lutf_cfg.master_name,
-                                       MAX_STR_LEN);
-                               master->name[MAX_STR_LEN-1] = '\0';
-                               master->node_type = EN_LUTF_MASTER;
-                               gethostname(master->hostname, MAX_STR_LEN);
-                               master->telnet_port = info->hb_info.agent_telnet_port;
-                               release_agent_blk(master);
-
-                               PDEBUG("Connected to master %s on fd %d",
-                                      master->name, master->iFileDesc);
-
-                               /*
-                                * add the master FD to the select FD set
-                                * to be able to process master messages
-                                */
-                               FD_SET(iConnFd, &g_tAllSet);
-                               iMaxSelectFd = get_highest_fd();
+                               /* process rpc */
+                               if (rpc_fd != INVALID_TCP_SOCKET) {
+                                       /* process the message */
+                                       rc = process_agent_message(agent, rpc_fd);
+                                       if (rc)
+                                               PERROR("msg failure: %s",
+                                                      lutf_rc2str(rc));
+                               }
+                               if (rc == EN_LUTF_RC_SOCKET_FAIL) {
+                                       if (agent->id == master->id) {
+                                               PERROR("Disconnected from master. Will attempt to reconnect");
+                                               master_connected = false;
+                                       }
+                                       release_agent_blk(agent, true);
+                                       continue;
+                               }
+                       }
+               }
 
-                               master_connected = true;
-                               master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
+               /* establish connection with the master if I'm an agent
+                * and I have not connected to the master yet.
+                * Otherwise send a heart beat
+                */
+               if (!master_connected &&
+                   strlen(g_lutf_cfg.master_name) != 0) {
+                       PDEBUG("Attempting a connection on master %s",
+                              g_lutf_cfg.master_name);
+                       master = find_free_agent_blk(&info->hb_info.master_address);
+                       if (!master) {
+                               PERROR("Failed to allocate agent block");
+                               continue;
                        }
-/*
-                       if (info->type == EN_LUTF_AGENT) {
-                               rc = send_hb(master, info->hb_info.node_name,
-                                            info->hb_info.agent_telnet_port,
-                                            info->type);
-                               if (rc != EN_LUTF_RC_OK) {
-                                       master_connected = false;
-                                       iMaxSelectFd = get_highest_fd();
-                               }
+
+                       iConnFd = establishTCPConnection(info->hb_info.master_address.sin_addr.s_addr,
+                                                        htons(info->hb_info.master_address.sin_port),
+                                                        true, false);
+                       if (iConnFd < 0) {
+                               PERROR("establishTCPConnection failure: %s. Clearing set",
+                                       lutf_rc2str(iConnFd));
+                               release_agent_blk(master, true);
+                               PERROR("Disconnected from master. Will attempt to reconnect");
+                               master_connected = false;
+                               continue;
                        }
-*/
+
+                       master->iFileDesc = iConnFd;
+                       memcpy(&master->addr,
+                              &info->hb_info.master_address,
+                              sizeof(master->addr));
+                       strncpy(master->name, g_lutf_cfg.master_name,
+                               MAX_STR_LEN);
+                       master->name[MAX_STR_LEN-1] = '\0';
+                       master->node_type = EN_LUTF_MASTER;
+                       gethostname(master->hostname, MAX_STR_LEN);
+                       master->telnet_port = info->hb_info.agent_telnet_port;
+
+                       PDEBUG("Connected to master %s on fd %d",
+                              master->name, master->iFileDesc);
+
                        /*
-                        * Get the time stamp and go through each agent
-                        * and see if it's still healthy.  For agents which
-                        * aren't healthy move off to the dead_list.
-                        * This operation is only valid if I'm a master
+                        * add the master FD to the select FD set
+                        * to be able to process master messages
                         */
-                       gettimeofday(&time_2, NULL);
-                       if (g_agent_enable_hb && info->type == EN_LUTF_MASTER) {
-                               /* check if HB_TO seconds has passed since the last
-                                * time we collected the time */
-                               if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
-
-                                       /* do the heartbeat check */
-                                       agent_hb_check(&time_1, info->type);
-                               }
+                       FD_SET(iConnFd, &g_tAllSet);
+                       g_iMaxSelectFd = get_highest_fd();
+
+                       master_connected = true;
+                       master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
+               }
+               /*
+               if (info->type == EN_LUTF_AGENT) {
+                       rc = send_hb(master, info->hb_info.node_name,
+                                    info->hb_info.agent_telnet_port,
+                                    info->type);
+                       if (rc != EN_LUTF_RC_OK) {
+                               master_connected = false;
+                               g_iMaxSelectFd = get_highest_fd();
                        }
+               }
+               */
+               /*
+                * Get the time stamp and go through each agent
+                * and see if it's still healthy.  For agents which
+                * aren't healthy move off to the dead_list.
+                * This operation is only valid if I'm a master
+                */
+               gettimeofday(&time_2, NULL);
+               if (agent_get_hb() && info->type == EN_LUTF_MASTER) {
+                       /* check if HB_TO seconds has passed since the last
+                        * time we collected the time
+                        */
+                       if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
+                               /* do the heartbeat check */
+                               agent_hb_check(&time_1, info->type);
+                       }
+               }
+
+               if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
+                       lutf_agent_blk_t *agent = NULL;
+                       int idx = 0;
+
+                       do {
+                               idx = get_next_active_agent(idx, &agent);
+                               /* A master doesn't send a heart
+                                * beat to himself
+                                */
+                               if (agent) {
+                                       bool dead = false;
+                                       if (info->type == EN_LUTF_MASTER &&
+                                           agent->id == master->id)
+                                               continue;
 
-                       if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
-                               lutf_agent_blk_t *agent = NULL;
-                               int idx = 0;
-
-                               do {
-                                       idx = get_next_active_agent(idx, &agent);
-                                       /* A master doesn't send a heart
-                                        * beat to himself */
-                                       if (agent) {
-                                               if (info->type == EN_LUTF_MASTER &&
-                                                   agent->id == master->id)
-                                                       continue;
-                                               int agent_id = agent->id;
-                                               rc = send_hb(agent, info->hb_info.node_name,
-                                                            info->hb_info.agent_telnet_port,
-                                                            info->type);
-                                               if (rc != EN_LUTF_RC_OK) {
-                                                       if (agent->id == master->id) {
-                                                               PERROR("Disconnected from master. Will attempt to reconnect");
-                                                               master_connected = false;
-                                                       }
-                                                       iMaxSelectFd = close_agent_connection(agent);
-                                                       release_agent_blk(agent);
-                                                       free_agent_blk(agent_id);
-                                               } else {
-                                                       release_agent_blk(agent);
+                                       rc = send_hb(agent, info->hb_info.node_name,
+                                                    info->hb_info.agent_telnet_port,
+                                                    info->type);
+                                       if (rc != EN_LUTF_RC_OK) {
+                                               if (agent->id == master->id) {
+                                                       PERROR("Disconnected from master. Will attempt to reconnect");
+                                                       master_connected = false;
                                                }
+                                               dead = true;
                                        }
-                               } while (agent);
-                       }
+                                       release_agent_blk(agent, dead);
+                               }
+                       } while (agent);
                }
+
                /* store the current time */
-               time_1 = time_2;
+               memcpy(&time_1, &time_2, sizeof(time_1));
        }
 
        /* Zero out the g_tAllSet */
index 629f677..8327c47 100644 (file)
@@ -2,6 +2,7 @@
 #define LUTF_LISTENER_H
 
 #include "lutf_common.h"
+#include "lutf_agent.h"
 
 /*
  * lutf_listener_main
@@ -11,4 +12,8 @@ void *lutf_listener_main(void *usr_data);
 
 void lutf_listener_shutdown(void);
 
+void close_agent_connection(lutf_agent_blk_t *agent);
+
+void release_dead_list_agents(void);
+
 #endif /* LUTF_LISTENER_H */
index a3efe9d..20682c1 100644 (file)
@@ -5,7 +5,7 @@
 #include "lutf_listener.h"
 
 extern lutf_config_params_t g_lutf_cfg;
-bool g_py_inited = false;
+bool g_py_inited;
 
 static char *get_path_segment(char *path, int *len, char **more)
 {
@@ -27,14 +27,14 @@ static char *get_path_segment(char *path, int *len, char **more)
        return str;
 }
 
-static void python_run_interactive_shell(void)
+static lutf_rc_t python_run_interactive_shell(void)
 {
        char buf[MAX_STR_LEN + 20];
        char segment[MAX_STR_LEN];
        char *seg;
        char *more = g_lutf_cfg.py_path;
        int len = 0;
-       lutf_rc_t rc;
+       lutf_rc_t rc = EN_LUTF_RC_OK;
 
        PyRun_SimpleString("import code\n");
        PyRun_SimpleString("import os\n");
@@ -76,7 +76,8 @@ static void python_run_interactive_shell(void)
                rc = wait_for_agents(g_lutf_cfg.agents, 20);
                if (rc == EN_LUTF_RC_TIMEOUT) {
                        PERROR("Not all agents connected. Aborting tests");
-                       return;
+                       lutf_listener_shutdown();
+                       return EN_LUTF_RC_TIMEOUT;
                }
 
                /* update the LUTF internal database */
@@ -84,18 +85,32 @@ static void python_run_interactive_shell(void)
                PDEBUG("Agents reloaded. Dumping");
                PyRun_SimpleString("agents.dump()");
 
+               /* if a script is specified then a unique suite must be
+                * specified as well.
+                * if a suite_list is specified it takes precedence over
+                * a single suite parameters. A pattern of scripts can be
+                * provided to run matching scripts.
+                * If a suite parameter is provided then run that
+                * particular suite.
+                * Otherwise just run everything.
+                */
                if (g_lutf_cfg.script && strlen(g_lutf_cfg.script) > 0) {
                        snprintf(buf, MAX_STR_LEN,
                                 "suites['%s'].scripts['%s'].run()",
                                 g_lutf_cfg.suite,
                                 g_lutf_cfg.script);
+               } else if (g_lutf_cfg.suite_list &&
+                          strlen(g_lutf_cfg.suite_list) > 0) {
+                       snprintf(buf, MAX_STR_LEN,
+                                "suites.run(suite_list='%s', match='%s')",
+                                g_lutf_cfg.suite_list, pattern);
                } else if (g_lutf_cfg.suite && strlen(g_lutf_cfg.suite) > 0) {
                        snprintf(buf, MAX_STR_LEN,
                                 "suites['%s'].run('%s')",
                                 g_lutf_cfg.suite, pattern);
                } else {
                        snprintf(buf, MAX_STR_LEN,
-                                "suites.run('%s')", pattern);
+                                "suites.run(match='%s')", pattern);
                }
                PDEBUG("%s", buf);
                PyRun_SimpleString(buf);
@@ -106,7 +121,7 @@ static void python_run_interactive_shell(void)
                PDEBUG("Shutting down the LUTF");
                PyRun_SimpleString("me.exit()");
                lutf_listener_shutdown();
-               return;
+               return EN_LUTF_RC_OK;
        } else if (g_lutf_cfg.shell == EN_LUTF_RUN_INTERACTIVE) {
                int rc;
                char *intro;
@@ -137,36 +152,43 @@ static void python_run_interactive_shell(void)
                 */
                PDEBUG("Running in Daemon mode");
                sprintf(segment, "fname = os.path.join('%s', '%s')\n",
-                       g_lutf_cfg.lutf_path, OUT_PY_LOG);
+                       g_lutf_cfg.tmp_dir, OUT_PY_LOG);
                if (PyRun_SimpleString(segment)) {
                        PERROR("Failed to create log file");
+                       rc = EN_LUTF_RC_FAIL;
                        goto python_shutdown;
                }
                sprintf(segment, "logfile = open(fname, 'w')\n");
                if (PyRun_SimpleString(segment)) {
                        PERROR("Failed to open log file");
+                       rc = EN_LUTF_RC_FAIL;
                        goto python_shutdown;
                }
                if (PyRun_SimpleString("sys.stdout = sys.stderr = logfile\n")) {
                        PERROR("Failed to redirect stdout and stderr");
+                       rc = EN_LUTF_RC_FAIL;
                        goto python_shutdown;
                }
                if (PyRun_SimpleString("from lutf_telnet_sr import LutfTelnetServer\n")) {
                        PERROR("Failed to import LutfTelnetServer");
+                       rc = EN_LUTF_RC_FAIL;
                        goto python_shutdown;
                }
                sprintf(segment, "tns = LutfTelnetServer(%d)\n",
                        g_lutf_cfg.l_info.hb_info.agent_telnet_port);
                if (PyRun_SimpleString(segment)) {
                        PERROR("Failed to instantiate LutfTelnetServer");
+                       rc = EN_LUTF_RC_FAIL;
                        goto python_shutdown;
                }
                if (PyRun_SimpleString("tns.run()\n")) {
                        PERROR("Failed to run LutfTelnetServer instance");
+                       rc = EN_LUTF_RC_FAIL;
                        goto python_shutdown;
                }
                if (PyRun_SimpleString("logfile.close()")) {
                        PERROR("Failed to close logfile");
+                       rc = EN_LUTF_RC_FAIL;
                        goto python_shutdown;
                }
 python_shutdown:
@@ -174,6 +196,8 @@ python_shutdown:
        }
        g_py_inited = false;
        lutf_listener_shutdown();
+
+       return rc;
 }
 
 /*
@@ -183,6 +207,7 @@ python_shutdown:
 lutf_rc_t python_init(void)
 {
        wchar_t program[5];
+       lutf_rc_t rc;
 
        swprintf(program, 3, L"%hs", "lutf");
 
@@ -199,14 +224,14 @@ lutf_rc_t python_init(void)
        //PySys_SetPath(new_path);
        //path = Py_GetPath();
 
-       python_run_interactive_shell();
+       rc = python_run_interactive_shell();
        PDEBUG("Python finalizing");
 
        Py_Finalize();
 
        PDEBUG("Python finalized");
 
-       return EN_LUTF_RC_OK;
+       return rc;
 }
 
 lutf_rc_t python_handle_rpc_request(char *rpc)
@@ -227,10 +252,10 @@ lutf_rc_t python_handle_rpc_request(char *rpc)
 
        gstate = PyGILState_Ensure();
 
-       str = PyUnicode_FromString((char*)"lutf");
+       str = PyUnicode_FromString((char *)"lutf");
        lutf = PyImport_Import(str);
-       me = PyObject_GetAttrString(lutf, (char*)"me");
-       handle_rpc_req = PyObject_GetAttrString(me, (char*)"handle_rpc_req");
+       me = PyObject_GetAttrString(lutf, (char *)"me");
+       handle_rpc_req = PyObject_GetAttrString(me, (char *)"handle_rpc_req");
        args = PyTuple_Pack(1, PyUnicode_FromString(rpc));
        result = PyObject_CallObject(handle_rpc_req, args);