X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Ftests%2Flutf%2Fsrc%2Fliblutf_agent.c;h=ea352711e2100b36653bb60c61f3b37bdf8108de;hb=a55b6dafeae9b6e52a6b0afb9b9848ac4980b105;hp=970c6c69f382223e51ac65f29aba75eb99e5c501;hpb=10b842909a5e9dfa05f12e08baf6aae1fa97972f;p=fs%2Flustre-release.git diff --git a/lustre/tests/lutf/src/liblutf_agent.c b/lustre/tests/lutf/src/liblutf_agent.c index 970c6c6..ea35271 100644 --- a/lustre/tests/lutf/src/liblutf_agent.c +++ b/lustre/tests/lutf/src/liblutf_agent.c @@ -11,14 +11,16 @@ #include "lutf_agent.h" #include "lutf.h" #include "lutf_python.h" +#include "lutf_listener.h" static pthread_mutex_t agent_array_mutex; static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS]; +static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS]; /* TODO: this is probably not thread safe */ static char agent_state_str[128]; -extern bool g_agent_enable_hb; -extern struct in_addr g_local_ip; +static bool g_agent_enable_hb = true; +static struct in_addr g_local_ip; #define DEFAULT_RPC_RSP "rpc:\n src: %s\n dst: %s\n type: failure\n" @@ -33,15 +35,111 @@ char *get_local_ip() return inet_ntoa(g_local_ip); } -void release_agent_blk(lutf_agent_blk_t *agent) +static void insert_dead_agent_locked(lutf_agent_blk_t *agent) { - /* release the agent blk mutex */ - MUTEX_LOCK(&agent->mutex); - if (agent) { - assert(agent->ref_count != 0); - agent->ref_count--; + int i = 0; + + for (i = 0; i < MAX_NUM_AGENTS; i++) { + if (agent_dead_list[i] == NULL) { + agent->state |= LUTF_AGENT_STATE_DEAD; + agent_dead_list[i] = agent; + agent->id = i; + break; + } + } + assert(i < MAX_NUM_AGENTS); +} + +static void del_dead_agent_locked(lutf_agent_blk_t *agent) +{ + assert(agent && + agent->state & LUTF_AGENT_STATE_DEAD && + agent_dead_list[agent->id] != NULL && + agent_dead_list[agent->id] == agent); + + assert(agent->ref_count > 0); + agent->ref_count--; + + if (agent->ref_count == 0) { + agent_dead_list[agent->id] = NULL; + memset(agent, 0xdeadbeef, sizeof(*agent)); + free(agent); } +} + +void release_dead_list_agents(void) +{ + int i; + + MUTEX_LOCK(&agent_array_mutex); + for (i = 0; i < MAX_NUM_AGENTS; i++) { + lutf_agent_blk_t *agent; + + agent = agent_dead_list[i]; + + if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) { + agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN; + del_dead_agent_locked(agent); + } + } + MUTEX_UNLOCK(&agent_array_mutex); +} + +static inline bool agent_alive(lutf_agent_blk_t *agent) +{ + bool viable = false; + + MUTEX_LOCK(&agent->mutex); + if (agent->state & LUTF_AGENT_STATE_ALIVE) + viable = true; MUTEX_UNLOCK(&agent->mutex); + + return viable; +} + +void release_agent_blk(lutf_agent_blk_t *agent, int dead) +{ + assert(agent); + + MUTEX_LOCK(&agent_array_mutex); + MUTEX_LOCK(&agent->mutex); + + if (agent->state & LUTF_AGENT_STATE_ALIVE) { + /* sanity check */ + assert(agent_live_list[agent->id] != NULL && + agent_live_list[agent->id] == agent); + } else { + MUTEX_UNLOCK(&agent->mutex); + del_dead_agent_locked(agent); + MUTEX_UNLOCK(&agent_array_mutex); + return; + } + + assert(agent->ref_count > 0); + agent->ref_count--; + + if (agent->ref_count == 0) { + agent_live_list[agent->id] = NULL; + assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS)); + MUTEX_UNLOCK(&agent->mutex); + MUTEX_UNLOCK(&agent_array_mutex); + close_agent_connection(agent); + memset(agent, 0xdeadbeef, sizeof(*agent)); + /* free the block */ + free(agent); + } else if (dead) { + agent_live_list[agent->id] = NULL; + insert_dead_agent_locked(agent); + MUTEX_UNLOCK(&agent->mutex); + MUTEX_UNLOCK(&agent_array_mutex); + unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE); + unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED); + unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED); + close_agent_connection(agent); + } else { + MUTEX_UNLOCK(&agent->mutex); + MUTEX_UNLOCK(&agent_array_mutex); + } } void acquire_agent_blk(lutf_agent_blk_t *agent) @@ -67,8 +165,7 @@ char *agent_state2str(lutf_agent_blk_t *agent) return agent_state_str; } -static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list, - struct sockaddr_in *addr) +static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr) { int i; lutf_agent_blk_t *agent; @@ -78,10 +175,11 @@ static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list, MUTEX_LOCK(&agent_array_mutex); for (i = 0; i < MAX_NUM_AGENTS; i++) { - agent = list[i]; - if ((agent) && + agent = agent_live_list[i]; + if ((agent) && agent_alive(agent) && (agent->addr.sin_addr.s_addr == addr->sin_addr.s_addr)) { + acquire_agent_blk(agent); MUTEX_UNLOCK(&agent_array_mutex); return agent; } @@ -102,7 +200,7 @@ int get_next_active_agent(int idx, lutf_agent_blk_t **out) MUTEX_LOCK(&agent_array_mutex); for (i = idx; i < MAX_NUM_AGENTS; i++) { agent = agent_live_list[i]; - if (agent) { + if (agent && agent_alive(agent)) { i++; acquire_agent_blk(agent); break; @@ -119,13 +217,11 @@ out: lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr) { lutf_agent_blk_t *agent; - agent = find_agent_blk_by_addr(agent_live_list, addr); + + agent = find_agent_blk_by_addr(addr); if (!agent) return find_free_agent_blk(addr); - - MUTEX_LOCK(&agent_array_mutex); - acquire_agent_blk(agent); - MUTEX_UNLOCK(&agent_array_mutex); + release_agent_blk(agent, false); return agent; } @@ -161,6 +257,11 @@ void agent_enable_hb(void) g_agent_enable_hb = true; } +int agent_get_hb(void) +{ + return g_agent_enable_hb; +} + lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr) { int i = 0; @@ -170,8 +271,10 @@ lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr) MUTEX_LOCK(&agent_array_mutex); /* iterate through the array to find a free entry */ - while (agent_live_list[i] != NULL) - i++; + for (i = 0; i < MAX_NUM_AGENTS; i++) { + if (agent_live_list[i] == NULL) + break; + } if (i >= MAX_NUM_AGENTS) { MUTEX_UNLOCK(&agent_array_mutex); @@ -226,7 +329,10 @@ lutf_agent_blk_t *find_agent_blk_by_id(int idx) agent = agent_live_list[idx]; - acquire_agent_blk(agent); + if (agent_alive(agent)) + acquire_agent_blk(agent); + else + agent = NULL; /* release the array mutex */ MUTEX_UNLOCK(&agent_array_mutex); @@ -244,53 +350,9 @@ void set_agent_state(lutf_agent_blk_t *agent, unsigned int state) void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state) { - bool zombie = false; - MUTEX_LOCK(&agent->mutex); agent->state &= ~state; - if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) && - (agent->state & LUTF_AGENT_ZOMBIE)) - zombie = true; - MUTEX_UNLOCK(&agent->mutex); - - if (zombie) - free_agent_blk(agent->id); -} - -void free_agent_blk(int id) -{ - lutf_agent_blk_t *agent; - - /* grab the array mutex */ - MUTEX_LOCK(&agent_array_mutex); - - /* if the blk is non null grab the mutex. - * possibly block until previous user is done - */ - if (agent_live_list[id] == NULL) { - MUTEX_UNLOCK(&agent_array_mutex); - return; - } - - agent = agent_live_list[id]; - - MUTEX_LOCK(&agent->mutex); - if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) { - MUTEX_UNLOCK(&agent->mutex); - MUTEX_UNLOCK(&agent_array_mutex); - PDEBUG("delay deleting agent %s\n", agent->name); - set_agent_state(agent, LUTF_AGENT_ZOMBIE); - return; - } MUTEX_UNLOCK(&agent->mutex); - - agent_live_list[id] = NULL; - - /* release the array mutex */ - MUTEX_UNLOCK(&agent_array_mutex); - - /* free the block */ - free(agent); } char *agent_ip2str(lutf_agent_blk_t *agent) @@ -305,6 +367,7 @@ int get_num_agents(void) { int i; int num = 0; + for (i = 0; i < MAX_NUM_AGENTS; i++) { if (agent_live_list[i] != NULL) num++; @@ -313,8 +376,7 @@ int get_num_agents(void) return num; } -/* no lock version of the function */ -static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name) +lutf_agent_blk_t *find_agent_blk_by_name(char *name) { lutf_agent_blk_t *agent; int i; @@ -326,10 +388,13 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name) for (i = 0; i < MAX_NUM_AGENTS; i++) { agent = agent_live_list[i]; - if ((agent) && + if ((agent) && agent_alive(agent) && ((strcmp(agent->name, name) == 0) || (strcmp(name, TEST_ROLE_GRC) == 0))) { + acquire_agent_blk(agent); break; + } else { + agent = NULL; } } @@ -339,18 +404,6 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name) return agent; } -lutf_agent_blk_t *find_agent_blk_by_name(char *name) -{ - lutf_agent_blk_t *agent; - - agent = find_agent_blk_by_name_nl(name); - if (agent) - acquire_agent_blk(agent); - - /* return the agent blk */ - return agent; -} - lutf_agent_blk_t *find_agent_blk_by_ip(char *ip) { lutf_agent_blk_t *agent; @@ -367,9 +420,12 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip) for (i = 0; i < MAX_NUM_AGENTS; i++) { agent = agent_live_list[i]; - if ((agent) && (agent->addr.sin_addr.s_addr == + if ((agent) && agent_alive(agent) && + (agent->addr.sin_addr.s_addr == addr.sin_addr.s_addr)) break; + else + agent = NULL; } if (agent) @@ -382,39 +438,6 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip) return agent; } -void agent_hb_check(struct timeval *t, lutf_type_t me) -{ - lutf_agent_blk_t *agent; - int i; - - /* grab the array mutex */ - MUTEX_LOCK(&agent_array_mutex); - - for (i = 0; i < MAX_NUM_AGENTS; i++) { - agent = agent_live_list[i]; - if (agent && agent->node_type != me) { - acquire_agent_blk(agent); - if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) { - int agent_id = agent->id; - /* agent didn't send a HB move to dead - * list - */ - PERROR("agent %s presumed dead", agent->name); - release_agent_blk(agent); - MUTEX_UNLOCK(&agent_array_mutex); - /* free_agent_blk() grabs the mutex */ - free_agent_blk(agent_id); - MUTEX_LOCK(&agent_array_mutex); - continue; - } - release_agent_blk(agent); - } - } - - /* release the array mutex */ - MUTEX_UNLOCK(&agent_array_mutex); -} - lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout) { struct timeval start; @@ -438,12 +461,13 @@ lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout) PDEBUG("Waiting for Agents"); while (cYAML_get_next_seq_item(agents, &a) != NULL) { PDEBUG("Looking up: %s", a->cy_valuestring); - if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) { + agent = find_agent_blk_by_name(a->cy_valuestring); + if (agent) { + PDEBUG("agent %s found\n", agent->name); + release_agent_blk(agent, false); + } else { found = false; break; - } else { - PDEBUG("agent %s found\n", agent->name); - release_agent_blk(agent); } } if (!found) @@ -484,7 +508,8 @@ int get_num_agents_remote(char *masterIP, int masterPort) htons(masterPort), false, false); if (remoteSocket < 0) { - PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket)); + PERROR("establishTCPConnection failure: %s", + lutf_rc2str(remoteSocket)); rc = remoteSocket; goto out; } @@ -547,7 +572,9 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp) agent_blk->name, inet_ntoa(agent_blk->addr.sin_addr), agent_blk->listen_port); - /* in network byte order, convert so we can have a uniform API */ + /* in network byte order, convert so we can have a + * uniform API + */ agent_blk->iRpcFd = establishTCPConnection( agent_blk->addr.sin_addr.s_addr, htons(agent_blk->listen_port), @@ -591,7 +618,8 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp) goto fail_rpc; } - rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout); + rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), + timeout); if (rc != EN_LUTF_RC_OK) { PERROR("Failed to recv rpc body in timeout %d", timeout); goto fail_rpc; @@ -604,15 +632,15 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp) * appropriately. */ *rsp = recvBuf; - release_agent_blk(agent_blk); - unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); + release_agent_blk(agent_blk, false); return EN_LUTF_RC_OK; fail_rpc: - release_agent_blk(agent_blk); unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); + set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN); + release_agent_blk(agent_blk, true); if (recvBuf) free(recvBuf); msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+ @@ -640,13 +668,14 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml) lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL; lutf_agent_blk_t *agent_blk; int msg_size; + bool dead = false; if (!agent || !yaml) goto out; msg_size = strlen(yaml) + 1; - agent_blk = find_agent_blk_by_name_nl(agent); + agent_blk = find_agent_blk_by_name(agent); if (!agent_blk) { PERROR("Can't find agent with name: %s", agent); goto out; @@ -657,14 +686,21 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml) MUTEX_UNLOCK(&agent_blk->mutex); PERROR("agent_blk %s doesn't have an RPC channel", agent_blk->name); - goto out; + goto release_agent; } MUTEX_UNLOCK(&agent_blk->mutex); + set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); PDEBUG("sending rpc response\n%s", yaml); rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size, EN_MSG_TYPE_RPC_RESPONSE); - + if (rc) + dead = true; +release_agent: + unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS); + release_agent_blk(agent_blk, dead); + if (dead) + set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN); out: return rc; }