From a55b6dafeae9b6e52a6b0afb9b9848ac4980b105 Mon Sep 17 00:00:00 2001 From: Amir Shehata Date: Mon, 5 Jul 2021 11:17:16 -0700 Subject: [PATCH] LU-10973 lnet: LUTF infrastructure updates Fix Agent management Handle python failures properly. Change default location for temporary files to be in /tmp/lutf Test-Parameters: trivial Signed-off-by: Amir Shehata Change-Id: I4e37b6226dfa12de4b7a1f5bfd87f84e91ee1dda Reviewed-on: https://review.whamcloud.com/44177 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Serguei Smirnov Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre/tests/lutf/src/liblutf_agent.c | 290 +++++++++-------- lustre/tests/lutf/src/liblutf_connect.c | 32 +- lustre/tests/lutf/src/lutf.c | 40 ++- lustre/tests/lutf/src/lutf.h | 6 +- lustre/tests/lutf/src/lutf_agent.h | 24 +- lustre/tests/lutf/src/lutf_listener.c | 545 +++++++++++++++++--------------- lustre/tests/lutf/src/lutf_listener.h | 5 + lustre/tests/lutf/src/lutf_python.c | 49 ++- 8 files changed, 544 insertions(+), 447 deletions(-) diff --git a/lustre/tests/lutf/src/liblutf_agent.c b/lustre/tests/lutf/src/liblutf_agent.c index 970c6c6..ea35271 100644 --- a/lustre/tests/lutf/src/liblutf_agent.c +++ b/lustre/tests/lutf/src/liblutf_agent.c @@ -11,14 +11,16 @@ #include "lutf_agent.h" #include "lutf.h" #include "lutf_python.h" +#include "lutf_listener.h" static pthread_mutex_t agent_array_mutex; static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS]; +static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS]; /* TODO: this is probably not thread safe */ static char agent_state_str[128]; -extern bool g_agent_enable_hb; -extern struct in_addr g_local_ip; +static bool g_agent_enable_hb = true; +static struct in_addr g_local_ip; #define DEFAULT_RPC_RSP "rpc:\n src: %s\n dst: %s\n type: failure\n" @@ -33,15 +35,111 @@ char *get_local_ip() return inet_ntoa(g_local_ip); } -void release_agent_blk(lutf_agent_blk_t *agent) +static void insert_dead_agent_locked(lutf_agent_blk_t *agent) { - /* release the agent blk mutex */ - MUTEX_LOCK(&agent->mutex); - if (agent) { - assert(agent->ref_count != 0); - agent->ref_count--; + int i = 0; + + for (i = 0; i < MAX_NUM_AGENTS; i++) { + if (agent_dead_list[i] == NULL) { + agent->state |= LUTF_AGENT_STATE_DEAD; + agent_dead_list[i] = agent; + agent->id = i; + break; + } + } + assert(i < MAX_NUM_AGENTS); +} + +static void del_dead_agent_locked(lutf_agent_blk_t *agent) +{ + assert(agent && + agent->state & LUTF_AGENT_STATE_DEAD && + agent_dead_list[agent->id] != NULL && + agent_dead_list[agent->id] == agent); + + assert(agent->ref_count > 0); + agent->ref_count--; + + if (agent->ref_count == 0) { + agent_dead_list[agent->id] = NULL; + memset(agent, 0xdeadbeef, sizeof(*agent)); + free(agent); } +} + +void release_dead_list_agents(void) +{ + int i; + + MUTEX_LOCK(&agent_array_mutex); + for (i = 0; i < MAX_NUM_AGENTS; i++) { + lutf_agent_blk_t *agent; + + agent = agent_dead_list[i]; + + if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) { + agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN; + del_dead_agent_locked(agent); + } + } + MUTEX_UNLOCK(&agent_array_mutex); +} + +static inline bool agent_alive(lutf_agent_blk_t *agent) +{ + bool viable = false; + + MUTEX_LOCK(&agent->mutex); + if (agent->state & LUTF_AGENT_STATE_ALIVE) + viable = true; MUTEX_UNLOCK(&agent->mutex); + + return viable; +} + +void release_agent_blk(lutf_agent_blk_t *agent, int dead) +{ + assert(agent); + + MUTEX_LOCK(&agent_array_mutex); + MUTEX_LOCK(&agent->mutex); + + if (agent->state & LUTF_AGENT_STATE_ALIVE) { + /* sanity check */ + assert(agent_live_list[agent->id] != NULL && + agent_live_list[agent->id] == agent); + } else { + MUTEX_UNLOCK(&agent->mutex); + del_dead_agent_locked(agent); + MUTEX_UNLOCK(&agent_array_mutex); + return; + } + + assert(agent->ref_count > 0); + agent->ref_count--; + + if (agent->ref_count == 0) { + agent_live_list[agent->id] = NULL; + assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS)); + MUTEX_UNLOCK(&agent->mutex); + MUTEX_UNLOCK(&agent_array_mutex); + close_agent_connection(agent); + memset(agent, 0xdeadbeef, sizeof(*agent)); + /* free the block */ + free(agent); + } else if (dead) { + agent_live_list[agent->id] = NULL; + insert_dead_agent_locked(agent); + MUTEX_UNLOCK(&agent->mutex); + MUTEX_UNLOCK(&agent_array_mutex); + unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE); + unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED); + unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED); + close_agent_connection(agent); + } else { + MUTEX_UNLOCK(&agent->mutex); + MUTEX_UNLOCK(&agent_array_mutex); + } } void acquire_agent_blk(lutf_agent_blk_t *agent) @@ -67,8 +165,7 @@ char *agent_state2str(lutf_agent_blk_t *agent) return agent_state_str; } -static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list, - struct sockaddr_in *addr) +static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr) { int i; lutf_agent_blk_t *agent; @@ -78,10 +175,11 @@ static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list, MUTEX_LOCK(&agent_array_mutex); for (i = 0; i < MAX_NUM_AGENTS; i++) { - agent = list[i]; - if ((agent) && + agent = agent_live_list[i]; + if ((agent) && agent_alive(agent) && (agent->addr.sin_addr.s_addr == addr->sin_addr.s_addr)) { + acquire_agent_blk(agent); MUTEX_UNLOCK(&agent_array_mutex); return agent; } @@ -102,7 +200,7 @@ int get_next_active_agent(int idx, lutf_agent_blk_t **out) MUTEX_LOCK(&agent_array_mutex); for (i = idx; i < MAX_NUM_AGENTS; i++) { agent = agent_live_list[i]; - if (agent) { + if (agent && agent_alive(agent)) { i++; acquire_agent_blk(agent); break; @@ -119,13 +217,11 @@ out: lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr) { lutf_agent_blk_t *agent; - agent = find_agent_blk_by_addr(agent_live_list, addr); + + agent = find_agent_blk_by_addr(addr); if (!agent) return find_free_agent_blk(addr); - - MUTEX_LOCK(&agent_array_mutex); - acquire_agent_blk(agent); - MUTEX_UNLOCK(&agent_array_mutex); + release_agent_blk(agent, false); return agent; } @@ -161,6 +257,11 @@ void agent_enable_hb(void) g_agent_enable_hb = true; } +int agent_get_hb(void) +{ + return g_agent_enable_hb; +} + lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr) { int i = 0; @@ -170,8 +271,10 @@ lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr) MUTEX_LOCK(&agent_array_mutex); /* iterate through the array to find a free entry */ - while (agent_live_list[i] != NULL) - i++; + for (i = 0; i < MAX_NUM_AGENTS; i++) { + if (agent_live_list[i] == NULL) + break; + } if (i >= MAX_NUM_AGENTS) { MUTEX_UNLOCK(&agent_array_mutex); @@ -226,7 +329,10 @@ lutf_agent_blk_t *find_agent_blk_by_id(int idx) agent = agent_live_list[idx]; - acquire_agent_blk(agent); + if (agent_alive(agent)) + acquire_agent_blk(agent); + else + agent = NULL; /* release the array mutex */ MUTEX_UNLOCK(&agent_array_mutex); @@ -244,53 +350,9 @@ void set_agent_state(lutf_agent_blk_t *agent, unsigned int state) void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state) { - bool zombie = false; - MUTEX_LOCK(&agent->mutex); agent->state &= ~state; - if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) && - (agent->state & LUTF_AGENT_ZOMBIE)) - zombie = true; - MUTEX_UNLOCK(&agent->mutex); - - if (zombie) - free_agent_blk(agent->id); -} - -void free_agent_blk(int id) -{ - lutf_agent_blk_t *agent; - - /* grab the array mutex */ - MUTEX_LOCK(&agent_array_mutex); - - /* if the blk is non null grab the mutex. - * possibly block until previous user is done - */ - if (agent_live_list[id] == NULL) { - MUTEX_UNLOCK(&agent_array_mutex); - return; - } - - agent = agent_live_list[id]; - - MUTEX_LOCK(&agent->mutex); - if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) { - MUTEX_UNLOCK(&agent->mutex); - MUTEX_UNLOCK(&agent_array_mutex); - PDEBUG("delay deleting agent %s\n", agent->name); - set_agent_state(agent, LUTF_AGENT_ZOMBIE); - return; - } MUTEX_UNLOCK(&agent->mutex); - - agent_live_list[id] = NULL; - - /* release the array mutex */ - MUTEX_UNLOCK(&agent_array_mutex); - - /* free the block */ - free(agent); } char *agent_ip2str(lutf_agent_blk_t *agent) @@ -305,6 +367,7 @@ int get_num_agents(void) { int i; int num = 0; + for (i = 0; i < MAX_NUM_AGENTS; i++) { if (agent_live_list[i] != NULL) num++; @@ -313,8 +376,7 @@ int get_num_agents(void) return num; } -/* no lock version of the function */ -static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name) +lutf_agent_blk_t *find_agent_blk_by_name(char *name) { lutf_agent_blk_t *agent; int i; @@ -326,10 +388,13 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name) for (i = 0; i < MAX_NUM_AGENTS; i++) { agent = agent_live_list[i]; - if ((agent) && + if ((agent) && agent_alive(agent) && ((strcmp(agent->name, name) == 0) || (strcmp(name, TEST_ROLE_GRC) == 0))) { + acquire_agent_blk(agent); break; + } else { + agent = NULL; } } @@ -339,18 +404,6 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name) return agent; } -lutf_agent_blk_t *find_agent_blk_by_name(char *name) -{ - lutf_agent_blk_t *agent; - - agent = find_agent_blk_by_name_nl(name); - if (agent) - acquire_agent_blk(agent); - - /* return the agent blk */ - return agent; -} - lutf_agent_blk_t *find_agent_blk_by_ip(char *ip) { lutf_agent_blk_t *agent; @@ -367,9 +420,12 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip) for (i = 0; i < MAX_NUM_AGENTS; i++) { agent = agent_live_list[i]; - if ((agent) && (agent->addr.sin_addr.s_addr == + if ((agent) && agent_alive(agent) && + (agent->addr.sin_addr.s_addr == addr.sin_addr.s_addr)) break; + else + agent = NULL; } if (agent) @@ -382,39 +438,6 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip) return agent; } -void agent_hb_check(struct timeval *t, lutf_type_t me) -{ - lutf_agent_blk_t *agent; - int i; - - /* grab the array mutex */ - MUTEX_LOCK(&agent_array_mutex); - - for (i = 0; i < MAX_NUM_AGENTS; i++) { - agent = agent_live_list[i]; - if (agent && agent->node_type != me) { - acquire_agent_blk(agent); - if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) { - int agent_id = agent->id; - /* agent didn't send a HB move to dead - * list - */ - PERROR("agent %s presumed dead", agent->name); - release_agent_blk(agent); - MUTEX_UNLOCK(&agent_array_mutex); - /* free_agent_blk() grabs the mutex */ - free_agent_blk(agent_id); - MUTEX_LOCK(&agent_array_mutex); - continue; - } - release_agent_blk(agent); - } - } - - /* release the array mutex */ - MUTEX_UNLOCK(&agent_array_mutex); -} - lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout) { struct timeval start; @@ -438,12 +461,13 @@ lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout) PDEBUG("Waiting for Agents"); while (cYAML_get_next_seq_item(agents, &a) != NULL) { PDEBUG("Looking up: %s", a->cy_valuestring); - if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) { + agent = find_agent_blk_by_name(a->cy_valuestring); + if (agent) { + PDEBUG("agent %s found\n", agent->name); + release_agent_blk(agent, false); + } else { found = false; break; - } else { - PDEBUG("agent %s found\n", agent->name); - release_agent_blk(agent); } } if (!found) @@ -484,7 +508,8 @@ int get_num_agents_remote(char *masterIP, int masterPort) htons(masterPort), false, false); if (remoteSocket < 0) { - PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket)); + PERROR("establishTCPConnection failure: %s", + lutf_rc2str(remoteSocket)); rc = remoteSocket; goto out; } @@ -547,7 +572,9 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp) agent_blk->name, inet_ntoa(agent_blk->addr.sin_addr), agent_blk->listen_port); - /* in network byte order, convert so we can have a uniform API */ + /* in network byte order, convert so we can have a + * uniform API + */ agent_blk->iRpcFd = establishTCPConnection( agent_blk->addr.sin_addr.s_addr, htons(agent_blk->listen_port), @@ -591,7 +618,8 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp) goto fail_rpc; } - rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout); + rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), + timeout); if (rc != EN_LUTF_RC_OK) { PERROR("Failed to recv rpc body in timeout %d", timeout); goto fail_rpc; @@ -604,15 +632,15 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp) * appropriately. */ *rsp = recvBuf; - release_agent_blk(agent_blk); - unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); + release_agent_blk(agent_blk, false); return EN_LUTF_RC_OK; fail_rpc: - release_agent_blk(agent_blk); unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); + set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN); + release_agent_blk(agent_blk, true); if (recvBuf) free(recvBuf); msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+ @@ -640,13 +668,14 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml) lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL; lutf_agent_blk_t *agent_blk; int msg_size; + bool dead = false; if (!agent || !yaml) goto out; msg_size = strlen(yaml) + 1; - agent_blk = find_agent_blk_by_name_nl(agent); + agent_blk = find_agent_blk_by_name(agent); if (!agent_blk) { PERROR("Can't find agent with name: %s", agent); goto out; @@ -657,14 +686,21 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml) MUTEX_UNLOCK(&agent_blk->mutex); PERROR("agent_blk %s doesn't have an RPC channel", agent_blk->name); - goto out; + goto release_agent; } MUTEX_UNLOCK(&agent_blk->mutex); + set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); PDEBUG("sending rpc response\n%s", yaml); rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size, EN_MSG_TYPE_RPC_RESPONSE); - + if (rc) + dead = true; +release_agent: + unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); + release_agent_blk(agent_blk, dead); + if (dead) + set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN); out: return rc; } diff --git a/lustre/tests/lutf/src/liblutf_connect.c b/lustre/tests/lutf/src/liblutf_connect.c index 796571a..a14cbbd 100644 --- a/lustre/tests/lutf/src/liblutf_connect.c +++ b/lustre/tests/lutf/src/liblutf_connect.c @@ -24,7 +24,8 @@ static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA, fd_set rset, wset; struct timeval tval; - if ((iN = connect(iSockFd, (struct sockaddr *)psSA, iSAlen)) < 0) { + iN = connect(iSockFd, (struct sockaddr *)psSA, iSAlen); + if (iN < 0) { if (errno != EINPROGRESS) { PERROR("Connect Failed: %s:%d", strerror(errno), errno); return EN_LUTF_RC_FAIL; @@ -38,8 +39,9 @@ static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA, tval.tv_sec = iNsec; tval.tv_usec = 0; - if ((iN = select(iSockFd+1, &rset, &wset, NULL, - iNsec ? &tval : NULL)) == 0) { + iN = select(iSockFd+1, &rset, &wset, NULL, + iNsec ? &tval : NULL); + if (iN == 0) { errno = ETIMEDOUT; PERROR("Select timed out"); return EN_LUTF_RC_FAIL; @@ -50,8 +52,10 @@ static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA, if (FD_ISSET(iSockFd, &rset) || FD_ISSET(iSockFd, &wset)) { iLen = sizeof(iError); - if (getsockopt(iSockFd, SOL_SOCKET, SO_ERROR, &iError, (socklen_t *)&iLen) < 0) { - PERROR("getsockopt failed indicating connect failure, errno= %d", errno); + if (getsockopt(iSockFd, SOL_SOCKET, SO_ERROR, &iError, + (socklen_t *)&iLen) < 0) { + PERROR("getsockopt failed indicating connect failure, errno= %d", + errno); return EN_LUTF_RC_FAIL; } } else { @@ -81,8 +85,8 @@ int establishTCPConnection(unsigned long uiAddress, lutf_rc_t eRc = EN_LUTF_RC_OK; /* Create TCP socket */ - if ((rsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) - == -1) + rsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (rsocket == -1) return EN_LUTF_RC_FAIL; /* Turn off Nagle's algorithm for this TCP socket. */ @@ -109,11 +113,9 @@ int establishTCPConnection(unsigned long uiAddress, tm_addr.sin_port = (endian) ? htons(iPort) : iPort; tm_addr.sin_family = AF_INET; - if ((eRc = doNonBlockingConnect(rsocket, - (struct sockaddr *)&tm_addr, - sizeof(tm_addr), - SOCKET_CONN_TIMEOUT_SEC)) - != EN_LUTF_RC_OK) { + eRc = doNonBlockingConnect(rsocket, (struct sockaddr *)&tm_addr, + sizeof(tm_addr), SOCKET_CONN_TIMEOUT_SEC); + if (eRc != EN_LUTF_RC_OK) { close(rsocket); return eRc; } @@ -127,7 +129,7 @@ lutf_rc_t closeTcpConnection(int iTcpSocket) PDEBUG("closing socket %d", iTcpSocket); rc = close(iTcpSocket); - if (!rc && errno != EINPROGRESS && errno != ECONNRESET) { + if (rc && errno != EINPROGRESS && errno != ECONNRESET) { PERROR("failed to close %d:%d\n", iTcpSocket, errno); return EN_LUTF_RC_FAIL; } @@ -275,7 +277,9 @@ lutf_rc_t readTcpMessage(int iFd, char *pcBuffer, if (tNread < 0) { if (errno == EINTR) { - /* We were interrupted, but this is not an error condition. */ + /* We were interrupted, but this is not an + * error condition. + */ tNread = 0; } else if ((errno == EAGAIN) && (!iTimeout)) { return EN_LUTF_RC_SOCKET_FAIL; diff --git a/lustre/tests/lutf/src/lutf.c b/lustre/tests/lutf/src/lutf.c index e174fe9..4ed1bf0 100644 --- a/lustre/tests/lutf/src/lutf.c +++ b/lustre/tests/lutf/src/lutf.c @@ -25,16 +25,11 @@ #define HB_TIMEOUT 2 -struct in_addr g_local_ip; FILE *out; char *outlog; -/*externs needed by getopt lib*/ -extern char *optarg; -extern int optind; - static void -lutf_help_usage(const struct option *long_options, const char *description[]) +lutf_help_usage(const struct option *long_options, const char *const description[]) { int i = 0; @@ -87,7 +82,8 @@ lutf_rc_t hostname_to_ip(char *hostname, char *ip, int len) hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6 hints.ai_socktype = SOCK_STREAM; - if ((rv = getaddrinfo(hostname, "http", &hints, &servinfo)) != 0) { + rv = getaddrinfo(hostname, "http", &hints, &servinfo); + if (rv != 0) { PERROR("getaddrinfo: %s\n", gai_strerror(rv)); return EN_LUTF_RC_BAD_ADDR; } @@ -182,15 +178,14 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree, /* maybe it's a host name so let's try * that out */ - rc = EN_LUTF_RC_BAD_ADDR; - if ((rc = hostname_to_ip(tmp->cy_valuestring, maddr, - sizeof(maddr))) - != EN_LUTF_RC_OK) { + rc = hostname_to_ip(tmp->cy_valuestring, maddr, + sizeof(maddr)); + if (rc != EN_LUTF_RC_OK) { *elem = "master-address"; return rc; } else if (!inet_aton(maddr, &addr)) { *elem = "master-address"; - return rc; + return EN_LUTF_RC_BAD_ADDR; } } cfg->l_info.hb_info.master_address.sin_addr = addr; @@ -244,8 +239,8 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree, tmp = get_value(head, "node-name"); if (tmp) { if (tmp->cy_type == CYAML_TYPE_STRING) { - strncpy(cfg->l_info.hb_info.node_name, tmp->cy_valuestring, - MAX_STR_LEN); + strncpy(cfg->l_info.hb_info.node_name, + tmp->cy_valuestring, MAX_STR_LEN); cfg->l_info.hb_info.node_name[MAX_STR_LEN - 1] = '\0'; } else { *elem = "node-name"; @@ -268,6 +263,19 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree, return EN_LUTF_RC_MISSING_PARAM; } + tmp = get_value(head, "suite-list"); + if (tmp && cfg->l_info.type == EN_LUTF_MASTER) { + if (tmp->cy_type == CYAML_TYPE_STRING) + if (strlen(tmp->cy_valuestring) > 0) + cfg->suite_list = tmp->cy_valuestring; + else + cfg->suite_list = NULL; + else { + *elem = "suite-list"; + return EN_LUTF_RC_BAD_PARAM; + } + } + tmp = get_value(head, "suite"); if (tmp && cfg->l_info.type == EN_LUTF_MASTER) { if (tmp->cy_type == CYAML_TYPE_STRING) @@ -323,7 +331,7 @@ lutf_rc_t extract_config_parameters(struct cYAML *config_tree, return EN_LUTF_RC_BAD_PARAM; } } else { - cfg->results_file = "lutf_def_results"; + cfg->results_file = "/tmp/lutf/lutf_def_results"; } tmp = get_value(head, "agent-list"); @@ -377,7 +385,7 @@ main(int argc, char *argv[]) {NULL, 0, NULL, 0} }; - const char *description[] = { + static const char * const description[] = { /*'c'*/":\n\t\tYAML config file", /*'h'*/":\n\t\tPrint this help", NULL diff --git a/lustre/tests/lutf/src/lutf.h b/lustre/tests/lutf/src/lutf.h index 79bd62d..65546cb 100644 --- a/lustre/tests/lutf/src/lutf.h +++ b/lustre/tests/lutf/src/lutf.h @@ -74,6 +74,8 @@ typedef struct lutf_config_params_s { char *py_path; /* other python specific paths */ char *master_name; /* name of master. Important if I'm an agent */ char *suite; /* name of suite to run. Run all if not present */ + char *suite_list; /* list of suites to run. Takes precedence + over single suite parameter */ char *script; /* name of script to run. Suite must be specified */ char *pattern; /* file match pattern */ char *results_file; /* path to results file */ @@ -85,9 +87,9 @@ typedef struct lutf_config_params_s { lutf_config_params_t g_lutf_cfg; -static inline char *lutf_rc2str(lutf_rc_t rc) +static inline const char *lutf_rc2str(lutf_rc_t rc) { - char *str[] = { + static const char * const str[] = { [EN_LUTF_RC_OK] = "RC_OK", [EN_LUTF_RC_FAIL*-1] = "RC_FAIL", [EN_LUTF_RC_SYS_ERR*-1] = "RC_SYSTEM_ERROR", diff --git a/lustre/tests/lutf/src/lutf_agent.h b/lustre/tests/lutf/src/lutf_agent.h index f509004..7e769b9 100644 --- a/lustre/tests/lutf/src/lutf_agent.h +++ b/lustre/tests/lutf/src/lutf_agent.h @@ -12,7 +12,8 @@ struct cYAML; #define LUTF_AGENT_HB_CHANNEL_CONNECTED (1 << 1) #define LUTF_AGENT_RPC_CHANNEL_CONNECTED (1 << 2) #define LUTF_AGENT_WORK_IN_PROGRESS (1 << 3) -#define LUTF_AGENT_ZOMBIE (1 << 4) +#define LUTF_AGENT_NEED_LISTEN_CLEAN (1 << 4) +#define LUTF_AGENT_STATE_DEAD (1 << 5) typedef struct lutf_agent_blk_s { pthread_mutex_t mutex; @@ -88,12 +89,6 @@ lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr); lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr); /* - * free_agent_blk - * Free an agent blk that no longer is needed - */ -void free_agent_blk(int id); - -/* * acquire_agent_blk * acquire the agent for work */ @@ -103,7 +98,7 @@ void acquire_agent_blk(lutf_agent_blk_t *agent); * release_agent_blk * Release the agent blk */ -void release_agent_blk(lutf_agent_blk_t *agent); +void release_agent_blk(lutf_agent_blk_t *agent, int dead); /* * agent_ip2str @@ -112,13 +107,6 @@ void release_agent_blk(lutf_agent_blk_t *agent); char *agent_ip2str(lutf_agent_blk_t *agent); /* - * agent_hb_check - * Given a time struct insure that the agent doesn't exceed the HB - * time. - */ -void agent_hb_check(struct timeval *t, lutf_type_t whoami); - -/* * agent_disable_hb * Disables the HB */ @@ -131,6 +119,12 @@ void agent_disable_hb(void); void agent_enable_hb(void); /* + * agent_get_hb + * Get current HB state + */ +int agent_get_hb(void); + +/* * get the number of registered agents */ int get_num_agents(void); diff --git a/lustre/tests/lutf/src/lutf_listener.c b/lustre/tests/lutf/src/lutf_listener.c index 51690b6..781e52e 100644 --- a/lustre/tests/lutf/src/lutf_listener.c +++ b/lustre/tests/lutf/src/lutf_listener.c @@ -18,9 +18,9 @@ #include "lutf_listener.h" static fd_set g_tAllSet; -static bool g_bShutdown = false; +static bool g_bShutdown; static int g_iListenFd = INVALID_TCP_SOCKET; -bool g_agent_enable_hb = true; +static int g_iMaxSelectFd = INVALID_TCP_SOCKET; typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent); @@ -181,14 +181,16 @@ static lutf_rc_t init_comm(unsigned short server_port) sServAddr.sin_addr.s_addr = htonl(INADDR_ANY); sServAddr.sin_port = htons(server_port); - if (bind(g_iListenFd, (struct sockaddr *) &sServAddr, sizeof(sServAddr)) < 0) { + if (bind(g_iListenFd, (struct sockaddr *) &sServAddr, + sizeof(sServAddr)) < 0) { /* Cannot bind our listening socket. */ closeTcpConnection(g_iListenFd); return EN_LUTF_RC_BIND_FAILED; } - /* Let the system know we wish to listen to this port for - * connections. */ + /* Let the system know we wish to listen to this port for + * connections. + */ if (listen(g_iListenFd, 2) < 0) { /* Cannot listen to socket, close and fail */ closeTcpConnection(g_iListenFd); @@ -209,19 +211,6 @@ static lutf_rc_t init_comm(unsigned short server_port) return EN_LUTF_RC_OK; } -static inline int close_agent_connection(lutf_agent_blk_t *agent) -{ - FD_CLR(agent->iFileDesc, &g_tAllSet); - FD_CLR(agent->iRpcFd, &g_tAllSet); - closeTcpConnection(agent->iRpcFd); - closeTcpConnection(agent->iFileDesc); - agent->state &= - ~LUTF_AGENT_RPC_CHANNEL_CONNECTED; - agent->state &= - ~LUTF_AGENT_HB_CHANNEL_CONNECTED; - return get_highest_fd(); -} - lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port, int type) { @@ -250,8 +239,7 @@ lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd) /* we assume the first connection is an HB connection */ if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) { if (agent->iFileDesc != INVALID_TCP_SOCKET) { - PERROR("agent in unexpected state. " - "state is %s, but HB FD is %d", + PERROR("agent in unexpected state. state is %s, but HB FD is %d", agent_state2str(agent), fd); return EN_LUTF_RC_SYS_ERR; } else { @@ -262,8 +250,7 @@ lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd) } } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) { if (agent->iRpcFd != INVALID_TCP_SOCKET) { - PERROR("agent in unexpected state. " - "state is %s, but RPC FD is %d", + PERROR("agent in unexpected state. state is %s, but RPC FD is %d", agent_state2str(agent), fd); return EN_LUTF_RC_SYS_ERR; } else { @@ -279,6 +266,44 @@ lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd) return EN_LUTF_RC_SYS_ERR; } +void close_agent_connection(lutf_agent_blk_t *agent) +{ + if (agent->iFileDesc != INVALID_TCP_SOCKET) { + FD_CLR(agent->iFileDesc, &g_tAllSet); + closeTcpConnection(agent->iFileDesc); + agent->iFileDesc = -1; + } + if (agent->iRpcFd != INVALID_TCP_SOCKET) { + FD_CLR(agent->iRpcFd, &g_tAllSet); + closeTcpConnection(agent->iRpcFd); + agent->iRpcFd = -1; + } + g_iMaxSelectFd = get_highest_fd(); +} + +void agent_hb_check(struct timeval *t, lutf_type_t me) +{ + lutf_agent_blk_t *agent; + int i = 0; + + for (i = 0; i < MAX_NUM_AGENTS; i++) { + agent = find_agent_blk_by_id(i); + + if (agent && agent->node_type != me) { + if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) { + /* agent didn't send a HB move to dead + * list + */ + PERROR("agent %s presumed dead", agent->name); + release_agent_blk(agent, true); + continue; + } + } + if (agent) + release_agent_blk(agent, false); + } +} + /* * lutf_listener_main * main loop. Listens for incoming agent connections, and for agent @@ -300,7 +325,6 @@ void *lutf_listener_main(void *usr_data) socklen_t tCliLen; fd_set tReadSet; int iNReady; - int iMaxSelectFd; int i; lutf_rc_t rc; lutf_agent_blk_t *agent = NULL, *master = NULL; @@ -323,287 +347,286 @@ void *lutf_listener_main(void *usr_data) agent_init(); - iMaxSelectFd = g_iListenFd; - - gettimeofday(&time_1, NULL); + g_iMaxSelectFd = g_iListenFd; + gettimeofday(&time_2, NULL); - /* Main Processing Loop: Keep going until we have reason to shutdown. */ + /* Main Processing Loop: Keep going until we have reason + * to shutdown. + */ while (!g_bShutdown) { /* Wait on our select mask for an event to occur. */ - tReadSet = g_tAllSet; - select_to.tv_sec = HB_TO; select_to.tv_usec = 0; - iNReady = select(iMaxSelectFd + 1, &tReadSet, NULL, NULL, &select_to); + FD_ZERO(&tReadSet); + tReadSet = g_tAllSet; + iNReady = select(g_iMaxSelectFd + 1, &tReadSet, NULL, NULL, + &select_to); + + release_dead_list_agents(); /* Determine if we failed the select call */ if (iNReady < 0) { /* Check to see if we were interrupted by a signal. */ if ((errno == EINTR) || (errno == EAGAIN)) { PERROR("Select failure: errno = %d", errno); - } else { - /* If this is an ECONNABORTED error, just ignore it. */ - if (errno != ECONNABORTED) { - /* Raise a fatal alarm. */ - /* Shut down */ - PERROR("Shutting down Listener thread. errno: %d", errno); - g_bShutdown = true; - } + } else if (errno != ECONNABORTED) { + /* If this is an ECONNABORTED error, just + * ignore it. Raise a fatal alarm and shut + * down. + */ + PERROR("Shutting down Listener thread. errno: %d", + errno); + lutf_listener_shutdown(); } - } else { - if (FD_ISSET(g_iListenFd, &tReadSet)) { - /* A new client is trying to connect. */ - tCliLen = sizeof(sCliAddr); - iConnFd = accept(g_iListenFd, (struct sockaddr *) &sCliAddr, - &tCliLen); - if (iConnFd < 0) { - /* Cannot accept new connection...just ignore. */ - if (errno != EWOULDBLOCK) - PERROR("Error on accept(), errno = %d", errno); - } else { - /* Try to see if we have an agent - * with the same address, since - * agents can have multiple tcp - * connections open - */ - agent = find_create_agent_blk_by_addr(&sCliAddr); - if (!agent) { - /* Cannot support more clients...just ignore. */ - PERROR("Cannot accept more clients"); - closeTcpConnection(iConnFd); - } else { - int iOption, iFlags; - - rc = complete_agent_connection(agent, - iConnFd); - if (rc != EN_LUTF_RC_OK) { - int agent_id = agent->id; - iMaxSelectFd = close_agent_connection(agent); - release_agent_blk(agent); - free_agent_blk(agent_id); - continue; - } - /* all nodes listen on the - * same port - */ - agent->listen_port = info->listen_port; - - /* Add new client to our select mask. */ - FD_SET(iConnFd, &g_tAllSet); - iMaxSelectFd = get_highest_fd(); - - /* Ok, it seems that the connected socket gains - * the same flags as the listen socket. We want - * to make it blocking here. - */ - iFlags = fcntl(iConnFd, F_GETFL, 0); - fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK)); - - /* And, we want to turn off Nagle's algorithm to - * reduce latency - */ - iOption = 1; - setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY, - (void *)&iOption, - sizeof(iOption)); - - PDEBUG("Received a connection from %s on FD %d\n", - inet_ntoa(agent->addr.sin_addr), iConnFd); - - release_agent_blk(agent); - } - } + /* store the current time */ + time_1 = time_2; - /* See if there are other messages waiting. */ - iNReady--; - } + /* Zero out the g_tAllSet */ + FD_ZERO(&g_tAllSet); - /* need to iterate through the clients and see if a - * message was sent to any of them - */ - for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) { - /* reset the return code to avoid misbehaving on previous - * returns + continue; + } + + if (FD_ISSET(g_iListenFd, &tReadSet)) { + /* A new client is trying to connect. */ + tCliLen = sizeof(sCliAddr); + iConnFd = accept(g_iListenFd, + (struct sockaddr *) &sCliAddr, + &tCliLen); + if (iConnFd < 0) { + /* Cannot accept new connection... + * just ignore. */ - rc = EN_LUTF_RC_OK; + if (errno != EWOULDBLOCK) + PERROR("Error on accept(), errno = %d", + errno); + } else { + /* Try to see if we have an agent + * with the same address, since + * agents can have multiple tcp + * connections open + */ + agent = find_create_agent_blk_by_addr(&sCliAddr); + if (!agent) { + /* Cannot support more clients...just ignore. */ + PERROR("Cannot accept more clients"); + closeTcpConnection(iConnFd); + } else { + int iOption, iFlags; - if ((agent = find_agent_blk_by_id(i))) { - int hb_fd = INVALID_TCP_SOCKET; - int rpc_fd = INVALID_TCP_SOCKET; + rc = complete_agent_connection(agent, + iConnFd); + if (rc != EN_LUTF_RC_OK) { + release_agent_blk(agent, true); + continue; + } - if (FD_ISSET(agent->iFileDesc, &tReadSet)) - hb_fd = agent->iFileDesc; - if (FD_ISSET(agent->iRpcFd, &tReadSet)) - rpc_fd = agent->iRpcFd; + /* all nodes listen on the + * same port + */ + agent->listen_port = info->listen_port; - if (hb_fd == INVALID_TCP_SOCKET && - rpc_fd == INVALID_TCP_SOCKET) - continue; + /* Add new client to our select mask. */ + FD_SET(iConnFd, &g_tAllSet); + g_iMaxSelectFd = get_highest_fd(); - /* process heart beat */ - if (hb_fd != INVALID_TCP_SOCKET) { - /* process the message */ - rc = process_agent_message(agent, hb_fd); - if (rc) - PERROR("msg failure: %s", - lutf_rc2str(rc)); - } - if (rc == EN_LUTF_RC_SOCKET_FAIL) { - int agent_id = agent->id; - if (agent->id == master->id) { - PERROR("Disconnected from master. Will attempt to reconnect"); - master_connected = false; - } - iMaxSelectFd = close_agent_connection(agent); - release_agent_blk(agent); - free_agent_blk(agent_id); - continue; - } + /* Ok, it seems that the connected socket gains + * the same flags as the listen socket. We want + * to make it blocking here. + */ + iFlags = fcntl(iConnFd, F_GETFL, 0); + fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK)); - /* process rpc */ - if (rpc_fd != INVALID_TCP_SOCKET) { - /* process the message */ - rc = process_agent_message(agent, rpc_fd); - if (rc) - PERROR("msg failure: %s", - lutf_rc2str(rc)); - } - if (rc == EN_LUTF_RC_SOCKET_FAIL) { - int agent_id = agent->id; - if (agent->id == master->id) { - PERROR("Disconnected from master. Will attempt to reconnect"); - master_connected = false; - } - iMaxSelectFd = close_agent_connection(agent); - release_agent_blk(agent); - free_agent_blk(agent_id); - continue; - } - release_agent_blk(agent); + /* And, we want to turn off Nagle's algorithm to + * reduce latency + */ + iOption = 1; + setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY, + (void *)&iOption, + sizeof(iOption)); + + PDEBUG("Received a connection from %s on FD %d\n", + inet_ntoa(agent->addr.sin_addr), iConnFd); } } - /* establish connection with the master if I'm an agent - * and I have not connected to the master yet. - * Otherwise send a heart beat + /* See if there are other messages waiting. */ + iNReady--; + } + + /* need to iterate through the clients and see if a + * message was sent to any of them + */ + for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) { + /* reset the return code to avoid misbehaving on previous + * returns */ - if (!master_connected && - strlen(g_lutf_cfg.master_name) != 0) { - PDEBUG("Attempting a connection on master %s", - g_lutf_cfg.master_name); - master = find_free_agent_blk(&info->hb_info.master_address); - if (!master) { - PERROR("Failed to allocate agent block"); + rc = EN_LUTF_RC_OK; + + if ((agent = find_agent_blk_by_id(i))) { + int hb_fd = INVALID_TCP_SOCKET; + int rpc_fd = INVALID_TCP_SOCKET; + + release_agent_blk(agent, false); + + if (FD_ISSET(agent->iFileDesc, &tReadSet)) + hb_fd = agent->iFileDesc; + if (FD_ISSET(agent->iRpcFd, &tReadSet)) + rpc_fd = agent->iRpcFd; + + if (hb_fd == INVALID_TCP_SOCKET && + rpc_fd == INVALID_TCP_SOCKET) continue; - } - iConnFd = establishTCPConnection( - info->hb_info.master_address.sin_addr.s_addr, - htons(info->hb_info.master_address.sin_port), - true, false); - - if (iConnFd < 0) { - int master_id = master->id; - - PERROR("establishTCPConnection failure: %s. Clearing set", - lutf_rc2str(iConnFd)); - iMaxSelectFd = close_agent_connection(master); - release_agent_blk(master); - free_agent_blk(master_id); - PERROR("Disconnected from master. Will attempt to reconnect"); - master_connected = false; + /* process heart beat */ + if (hb_fd != INVALID_TCP_SOCKET) { + /* process the message */ + rc = process_agent_message(agent, hb_fd); + if (rc) + PERROR("msg failure: %s", + lutf_rc2str(rc)); + } + if (rc == EN_LUTF_RC_SOCKET_FAIL) { + if (agent->id == master->id) { + PERROR("Disconnected from master. Will attempt to reconnect"); + master_connected = false; + } + release_agent_blk(agent, true); continue; } - master->iFileDesc = iConnFd; - memcpy(&master->addr, - &info->hb_info.master_address, - sizeof(master->addr)); - strncpy(master->name, g_lutf_cfg.master_name, - MAX_STR_LEN); - master->name[MAX_STR_LEN-1] = '\0'; - master->node_type = EN_LUTF_MASTER; - gethostname(master->hostname, MAX_STR_LEN); - master->telnet_port = info->hb_info.agent_telnet_port; - release_agent_blk(master); - - PDEBUG("Connected to master %s on fd %d", - master->name, master->iFileDesc); - - /* - * add the master FD to the select FD set - * to be able to process master messages - */ - FD_SET(iConnFd, &g_tAllSet); - iMaxSelectFd = get_highest_fd(); + /* process rpc */ + if (rpc_fd != INVALID_TCP_SOCKET) { + /* process the message */ + rc = process_agent_message(agent, rpc_fd); + if (rc) + PERROR("msg failure: %s", + lutf_rc2str(rc)); + } + if (rc == EN_LUTF_RC_SOCKET_FAIL) { + if (agent->id == master->id) { + PERROR("Disconnected from master. Will attempt to reconnect"); + master_connected = false; + } + release_agent_blk(agent, true); + continue; + } + } + } - master_connected = true; - master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED; + /* establish connection with the master if I'm an agent + * and I have not connected to the master yet. + * Otherwise send a heart beat + */ + if (!master_connected && + strlen(g_lutf_cfg.master_name) != 0) { + PDEBUG("Attempting a connection on master %s", + g_lutf_cfg.master_name); + master = find_free_agent_blk(&info->hb_info.master_address); + if (!master) { + PERROR("Failed to allocate agent block"); + continue; } -/* - if (info->type == EN_LUTF_AGENT) { - rc = send_hb(master, info->hb_info.node_name, - info->hb_info.agent_telnet_port, - info->type); - if (rc != EN_LUTF_RC_OK) { - master_connected = false; - iMaxSelectFd = get_highest_fd(); - } + + iConnFd = establishTCPConnection(info->hb_info.master_address.sin_addr.s_addr, + htons(info->hb_info.master_address.sin_port), + true, false); + if (iConnFd < 0) { + PERROR("establishTCPConnection failure: %s. Clearing set", + lutf_rc2str(iConnFd)); + release_agent_blk(master, true); + PERROR("Disconnected from master. Will attempt to reconnect"); + master_connected = false; + continue; } -*/ + + master->iFileDesc = iConnFd; + memcpy(&master->addr, + &info->hb_info.master_address, + sizeof(master->addr)); + strncpy(master->name, g_lutf_cfg.master_name, + MAX_STR_LEN); + master->name[MAX_STR_LEN-1] = '\0'; + master->node_type = EN_LUTF_MASTER; + gethostname(master->hostname, MAX_STR_LEN); + master->telnet_port = info->hb_info.agent_telnet_port; + + PDEBUG("Connected to master %s on fd %d", + master->name, master->iFileDesc); + /* - * Get the time stamp and go through each agent - * and see if it's still healthy. For agents which - * aren't healthy move off to the dead_list. - * This operation is only valid if I'm a master + * add the master FD to the select FD set + * to be able to process master messages */ - gettimeofday(&time_2, NULL); - if (g_agent_enable_hb && info->type == EN_LUTF_MASTER) { - /* check if HB_TO seconds has passed since the last - * time we collected the time */ - if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) { - - /* do the heartbeat check */ - agent_hb_check(&time_1, info->type); - } + FD_SET(iConnFd, &g_tAllSet); + g_iMaxSelectFd = get_highest_fd(); + + master_connected = true; + master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED; + } + /* + if (info->type == EN_LUTF_AGENT) { + rc = send_hb(master, info->hb_info.node_name, + info->hb_info.agent_telnet_port, + info->type); + if (rc != EN_LUTF_RC_OK) { + master_connected = false; + g_iMaxSelectFd = get_highest_fd(); } + } + */ + /* + * Get the time stamp and go through each agent + * and see if it's still healthy. For agents which + * aren't healthy move off to the dead_list. + * This operation is only valid if I'm a master + */ + gettimeofday(&time_2, NULL); + if (agent_get_hb() && info->type == EN_LUTF_MASTER) { + /* check if HB_TO seconds has passed since the last + * time we collected the time + */ + if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) { + /* do the heartbeat check */ + agent_hb_check(&time_1, info->type); + } + } + + if (time_2.tv_sec - time_1.tv_sec >= HB_TO) { + lutf_agent_blk_t *agent = NULL; + int idx = 0; + + do { + idx = get_next_active_agent(idx, &agent); + /* A master doesn't send a heart + * beat to himself + */ + if (agent) { + bool dead = false; + if (info->type == EN_LUTF_MASTER && + agent->id == master->id) + continue; - if (time_2.tv_sec - time_1.tv_sec >= HB_TO) { - lutf_agent_blk_t *agent = NULL; - int idx = 0; - - do { - idx = get_next_active_agent(idx, &agent); - /* A master doesn't send a heart - * beat to himself */ - if (agent) { - if (info->type == EN_LUTF_MASTER && - agent->id == master->id) - continue; - int agent_id = agent->id; - rc = send_hb(agent, info->hb_info.node_name, - info->hb_info.agent_telnet_port, - info->type); - if (rc != EN_LUTF_RC_OK) { - if (agent->id == master->id) { - PERROR("Disconnected from master. Will attempt to reconnect"); - master_connected = false; - } - iMaxSelectFd = close_agent_connection(agent); - release_agent_blk(agent); - free_agent_blk(agent_id); - } else { - release_agent_blk(agent); + rc = send_hb(agent, info->hb_info.node_name, + info->hb_info.agent_telnet_port, + info->type); + if (rc != EN_LUTF_RC_OK) { + if (agent->id == master->id) { + PERROR("Disconnected from master. Will attempt to reconnect"); + master_connected = false; } + dead = true; } - } while (agent); - } + release_agent_blk(agent, dead); + } + } while (agent); } + /* store the current time */ - time_1 = time_2; + memcpy(&time_1, &time_2, sizeof(time_1)); } /* Zero out the g_tAllSet */ diff --git a/lustre/tests/lutf/src/lutf_listener.h b/lustre/tests/lutf/src/lutf_listener.h index 629f677..8327c47 100644 --- a/lustre/tests/lutf/src/lutf_listener.h +++ b/lustre/tests/lutf/src/lutf_listener.h @@ -2,6 +2,7 @@ #define LUTF_LISTENER_H #include "lutf_common.h" +#include "lutf_agent.h" /* * lutf_listener_main @@ -11,4 +12,8 @@ void *lutf_listener_main(void *usr_data); void lutf_listener_shutdown(void); +void close_agent_connection(lutf_agent_blk_t *agent); + +void release_dead_list_agents(void); + #endif /* LUTF_LISTENER_H */ diff --git a/lustre/tests/lutf/src/lutf_python.c b/lustre/tests/lutf/src/lutf_python.c index a3efe9d..20682c1 100644 --- a/lustre/tests/lutf/src/lutf_python.c +++ b/lustre/tests/lutf/src/lutf_python.c @@ -5,7 +5,7 @@ #include "lutf_listener.h" extern lutf_config_params_t g_lutf_cfg; -bool g_py_inited = false; +bool g_py_inited; static char *get_path_segment(char *path, int *len, char **more) { @@ -27,14 +27,14 @@ static char *get_path_segment(char *path, int *len, char **more) return str; } -static void python_run_interactive_shell(void) +static lutf_rc_t python_run_interactive_shell(void) { char buf[MAX_STR_LEN + 20]; char segment[MAX_STR_LEN]; char *seg; char *more = g_lutf_cfg.py_path; int len = 0; - lutf_rc_t rc; + lutf_rc_t rc = EN_LUTF_RC_OK; PyRun_SimpleString("import code\n"); PyRun_SimpleString("import os\n"); @@ -76,7 +76,8 @@ static void python_run_interactive_shell(void) rc = wait_for_agents(g_lutf_cfg.agents, 20); if (rc == EN_LUTF_RC_TIMEOUT) { PERROR("Not all agents connected. Aborting tests"); - return; + lutf_listener_shutdown(); + return EN_LUTF_RC_TIMEOUT; } /* update the LUTF internal database */ @@ -84,18 +85,32 @@ static void python_run_interactive_shell(void) PDEBUG("Agents reloaded. Dumping"); PyRun_SimpleString("agents.dump()"); + /* if a script is specified then a unique suite must be + * specified as well. + * if a suite_list is specified it takes precedence over + * a single suite parameters. A pattern of scripts can be + * provided to run matching scripts. + * If a suite parameter is provided then run that + * particular suite. + * Otherwise just run everything. + */ if (g_lutf_cfg.script && strlen(g_lutf_cfg.script) > 0) { snprintf(buf, MAX_STR_LEN, "suites['%s'].scripts['%s'].run()", g_lutf_cfg.suite, g_lutf_cfg.script); + } else if (g_lutf_cfg.suite_list && + strlen(g_lutf_cfg.suite_list) > 0) { + snprintf(buf, MAX_STR_LEN, + "suites.run(suite_list='%s', match='%s')", + g_lutf_cfg.suite_list, pattern); } else if (g_lutf_cfg.suite && strlen(g_lutf_cfg.suite) > 0) { snprintf(buf, MAX_STR_LEN, "suites['%s'].run('%s')", g_lutf_cfg.suite, pattern); } else { snprintf(buf, MAX_STR_LEN, - "suites.run('%s')", pattern); + "suites.run(match='%s')", pattern); } PDEBUG("%s", buf); PyRun_SimpleString(buf); @@ -106,7 +121,7 @@ static void python_run_interactive_shell(void) PDEBUG("Shutting down the LUTF"); PyRun_SimpleString("me.exit()"); lutf_listener_shutdown(); - return; + return EN_LUTF_RC_OK; } else if (g_lutf_cfg.shell == EN_LUTF_RUN_INTERACTIVE) { int rc; char *intro; @@ -137,36 +152,43 @@ static void python_run_interactive_shell(void) */ PDEBUG("Running in Daemon mode"); sprintf(segment, "fname = os.path.join('%s', '%s')\n", - g_lutf_cfg.lutf_path, OUT_PY_LOG); + g_lutf_cfg.tmp_dir, OUT_PY_LOG); if (PyRun_SimpleString(segment)) { PERROR("Failed to create log file"); + rc = EN_LUTF_RC_FAIL; goto python_shutdown; } sprintf(segment, "logfile = open(fname, 'w')\n"); if (PyRun_SimpleString(segment)) { PERROR("Failed to open log file"); + rc = EN_LUTF_RC_FAIL; goto python_shutdown; } if (PyRun_SimpleString("sys.stdout = sys.stderr = logfile\n")) { PERROR("Failed to redirect stdout and stderr"); + rc = EN_LUTF_RC_FAIL; goto python_shutdown; } if (PyRun_SimpleString("from lutf_telnet_sr import LutfTelnetServer\n")) { PERROR("Failed to import LutfTelnetServer"); + rc = EN_LUTF_RC_FAIL; goto python_shutdown; } sprintf(segment, "tns = LutfTelnetServer(%d)\n", g_lutf_cfg.l_info.hb_info.agent_telnet_port); if (PyRun_SimpleString(segment)) { PERROR("Failed to instantiate LutfTelnetServer"); + rc = EN_LUTF_RC_FAIL; goto python_shutdown; } if (PyRun_SimpleString("tns.run()\n")) { PERROR("Failed to run LutfTelnetServer instance"); + rc = EN_LUTF_RC_FAIL; goto python_shutdown; } if (PyRun_SimpleString("logfile.close()")) { PERROR("Failed to close logfile"); + rc = EN_LUTF_RC_FAIL; goto python_shutdown; } python_shutdown: @@ -174,6 +196,8 @@ python_shutdown: } g_py_inited = false; lutf_listener_shutdown(); + + return rc; } /* @@ -183,6 +207,7 @@ python_shutdown: lutf_rc_t python_init(void) { wchar_t program[5]; + lutf_rc_t rc; swprintf(program, 3, L"%hs", "lutf"); @@ -199,14 +224,14 @@ lutf_rc_t python_init(void) //PySys_SetPath(new_path); //path = Py_GetPath(); - python_run_interactive_shell(); + rc = python_run_interactive_shell(); PDEBUG("Python finalizing"); Py_Finalize(); PDEBUG("Python finalized"); - return EN_LUTF_RC_OK; + return rc; } lutf_rc_t python_handle_rpc_request(char *rpc) @@ -227,10 +252,10 @@ lutf_rc_t python_handle_rpc_request(char *rpc) gstate = PyGILState_Ensure(); - str = PyUnicode_FromString((char*)"lutf"); + str = PyUnicode_FromString((char *)"lutf"); lutf = PyImport_Import(str); - me = PyObject_GetAttrString(lutf, (char*)"me"); - handle_rpc_req = PyObject_GetAttrString(me, (char*)"handle_rpc_req"); + me = PyObject_GetAttrString(lutf, (char *)"me"); + handle_rpc_req = PyObject_GetAttrString(me, (char *)"handle_rpc_req"); args = PyTuple_Pack(1, PyUnicode_FromString(rpc)); result = PyObject_CallObject(handle_rpc_req, args); -- 1.8.3.1