Whamcloud - gitweb
LU-10973 lnet: LUTF infrastructure updates
[fs/lustre-release.git] / lustre / tests / lutf / src / liblutf_agent.c
index 970c6c6..ea35271 100644 (file)
 #include "lutf_agent.h"
 #include "lutf.h"
 #include "lutf_python.h"
+#include "lutf_listener.h"
 
 static pthread_mutex_t agent_array_mutex;
 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
+static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS];
 /* TODO: this is probably not thread safe */
 static char agent_state_str[128];
 
-extern bool g_agent_enable_hb;
-extern struct in_addr g_local_ip;
+static bool g_agent_enable_hb = true;
+static struct in_addr g_local_ip;
 
 #define DEFAULT_RPC_RSP "rpc:\n   src: %s\n   dst: %s\n   type: failure\n"
 
@@ -33,15 +35,111 @@ char *get_local_ip()
        return inet_ntoa(g_local_ip);
 }
 
-void release_agent_blk(lutf_agent_blk_t *agent)
+static void insert_dead_agent_locked(lutf_agent_blk_t *agent)
 {
-       /* release the agent blk mutex */
-       MUTEX_LOCK(&agent->mutex);
-       if (agent) {
-               assert(agent->ref_count != 0);
-               agent->ref_count--;
+       int i = 0;
+
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               if (agent_dead_list[i] == NULL) {
+                       agent->state |= LUTF_AGENT_STATE_DEAD;
+                       agent_dead_list[i] = agent;
+                       agent->id = i;
+                       break;
+               }
+       }
+       assert(i < MAX_NUM_AGENTS);
+}
+
+static void del_dead_agent_locked(lutf_agent_blk_t *agent)
+{
+       assert(agent &&
+              agent->state & LUTF_AGENT_STATE_DEAD &&
+              agent_dead_list[agent->id] != NULL &&
+              agent_dead_list[agent->id] == agent);
+
+       assert(agent->ref_count > 0);
+       agent->ref_count--;
+
+       if (agent->ref_count == 0) {
+               agent_dead_list[agent->id] = NULL;
+               memset(agent, 0xdeadbeef, sizeof(*agent));
+               free(agent);
        }
+}
+
+void release_dead_list_agents(void)
+{
+       int i;
+
+       MUTEX_LOCK(&agent_array_mutex);
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               lutf_agent_blk_t *agent;
+
+               agent = agent_dead_list[i];
+
+               if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) {
+                       agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN;
+                       del_dead_agent_locked(agent);
+               }
+       }
+       MUTEX_UNLOCK(&agent_array_mutex);
+}
+
+static inline bool agent_alive(lutf_agent_blk_t *agent)
+{
+       bool viable = false;
+
+       MUTEX_LOCK(&agent->mutex);
+       if (agent->state & LUTF_AGENT_STATE_ALIVE)
+               viable = true;
        MUTEX_UNLOCK(&agent->mutex);
+
+       return viable;
+}
+
+void release_agent_blk(lutf_agent_blk_t *agent, int dead)
+{
+       assert(agent);
+
+       MUTEX_LOCK(&agent_array_mutex);
+       MUTEX_LOCK(&agent->mutex);
+
+       if (agent->state & LUTF_AGENT_STATE_ALIVE) {
+               /* sanity check */
+               assert(agent_live_list[agent->id] != NULL &&
+                      agent_live_list[agent->id] == agent);
+       } else {
+               MUTEX_UNLOCK(&agent->mutex);
+               del_dead_agent_locked(agent);
+               MUTEX_UNLOCK(&agent_array_mutex);
+               return;
+       }
+
+       assert(agent->ref_count > 0);
+       agent->ref_count--;
+
+       if (agent->ref_count == 0) {
+               agent_live_list[agent->id] = NULL;
+               assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS));
+               MUTEX_UNLOCK(&agent->mutex);
+               MUTEX_UNLOCK(&agent_array_mutex);
+               close_agent_connection(agent);
+               memset(agent, 0xdeadbeef, sizeof(*agent));
+               /* free the block */
+               free(agent);
+       } else if (dead) {
+               agent_live_list[agent->id] = NULL;
+               insert_dead_agent_locked(agent);
+               MUTEX_UNLOCK(&agent->mutex);
+               MUTEX_UNLOCK(&agent_array_mutex);
+               unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
+               unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED);
+               unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED);
+               close_agent_connection(agent);
+       } else {
+               MUTEX_UNLOCK(&agent->mutex);
+               MUTEX_UNLOCK(&agent_array_mutex);
+       }
 }
 
 void acquire_agent_blk(lutf_agent_blk_t *agent)
@@ -67,8 +165,7 @@ char *agent_state2str(lutf_agent_blk_t *agent)
        return agent_state_str;
 }
 
-static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
-                                               struct sockaddr_in *addr)
+static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr)
 {
        int i;
        lutf_agent_blk_t *agent;
@@ -78,10 +175,11 @@ static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
 
        MUTEX_LOCK(&agent_array_mutex);
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
-               agent = list[i];
-               if ((agent) &&
+               agent = agent_live_list[i];
+               if ((agent) && agent_alive(agent) &&
                    (agent->addr.sin_addr.s_addr ==
                     addr->sin_addr.s_addr)) {
+                       acquire_agent_blk(agent);
                        MUTEX_UNLOCK(&agent_array_mutex);
                        return agent;
                }
@@ -102,7 +200,7 @@ int get_next_active_agent(int idx, lutf_agent_blk_t **out)
        MUTEX_LOCK(&agent_array_mutex);
        for (i = idx; i < MAX_NUM_AGENTS; i++) {
                agent = agent_live_list[i];
-               if (agent) {
+               if (agent && agent_alive(agent)) {
                        i++;
                        acquire_agent_blk(agent);
                        break;
@@ -119,13 +217,11 @@ out:
 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
 {
        lutf_agent_blk_t *agent;
-       agent = find_agent_blk_by_addr(agent_live_list, addr);
+
+       agent = find_agent_blk_by_addr(addr);
        if (!agent)
                return find_free_agent_blk(addr);
-
-       MUTEX_LOCK(&agent_array_mutex);
-       acquire_agent_blk(agent);
-       MUTEX_UNLOCK(&agent_array_mutex);
+       release_agent_blk(agent, false);
 
        return agent;
 }
@@ -161,6 +257,11 @@ void agent_enable_hb(void)
        g_agent_enable_hb = true;
 }
 
+int agent_get_hb(void)
+{
+       return g_agent_enable_hb;
+}
+
 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
 {
        int i = 0;
@@ -170,8 +271,10 @@ lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
        MUTEX_LOCK(&agent_array_mutex);
 
        /* iterate through the array to find a free entry */
-       while (agent_live_list[i] != NULL)
-               i++;
+       for (i = 0; i < MAX_NUM_AGENTS; i++) {
+               if (agent_live_list[i] == NULL)
+                       break;
+       }
 
        if (i >= MAX_NUM_AGENTS) {
                MUTEX_UNLOCK(&agent_array_mutex);
@@ -226,7 +329,10 @@ lutf_agent_blk_t *find_agent_blk_by_id(int idx)
 
        agent = agent_live_list[idx];
 
-       acquire_agent_blk(agent);
+       if (agent_alive(agent))
+               acquire_agent_blk(agent);
+       else
+               agent = NULL;
 
        /* release the array mutex */
        MUTEX_UNLOCK(&agent_array_mutex);
@@ -244,53 +350,9 @@ void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
 
 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
 {
-       bool zombie = false;
-
        MUTEX_LOCK(&agent->mutex);
        agent->state &= ~state;
-       if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) &&
-           (agent->state & LUTF_AGENT_ZOMBIE))
-               zombie = true;
-       MUTEX_UNLOCK(&agent->mutex);
-
-       if (zombie)
-               free_agent_blk(agent->id);
-}
-
-void free_agent_blk(int id)
-{
-       lutf_agent_blk_t *agent;
-
-       /* grab the array mutex */
-       MUTEX_LOCK(&agent_array_mutex);
-
-       /* if the blk is non null grab the mutex.
-        * possibly block until previous user is done
-        */
-       if (agent_live_list[id] == NULL) {
-               MUTEX_UNLOCK(&agent_array_mutex);
-               return;
-       }
-
-       agent = agent_live_list[id];
-
-       MUTEX_LOCK(&agent->mutex);
-       if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) {
-               MUTEX_UNLOCK(&agent->mutex);
-               MUTEX_UNLOCK(&agent_array_mutex);
-               PDEBUG("delay deleting agent %s\n", agent->name);
-               set_agent_state(agent, LUTF_AGENT_ZOMBIE);
-               return;
-       }
        MUTEX_UNLOCK(&agent->mutex);
-
-       agent_live_list[id] = NULL;
-
-       /* release the array mutex */
-       MUTEX_UNLOCK(&agent_array_mutex);
-
-       /* free the block */
-       free(agent);
 }
 
 char *agent_ip2str(lutf_agent_blk_t *agent)
@@ -305,6 +367,7 @@ int get_num_agents(void)
 {
        int i;
        int num = 0;
+
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
                if (agent_live_list[i] != NULL)
                        num++;
@@ -313,8 +376,7 @@ int get_num_agents(void)
        return num;
 }
 
-/* no lock version of the function */
-static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
+lutf_agent_blk_t *find_agent_blk_by_name(char *name)
 {
        lutf_agent_blk_t *agent;
        int i;
@@ -326,10 +388,13 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
 
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
                agent = agent_live_list[i];
-               if ((agent) &&
+               if ((agent) && agent_alive(agent) &&
                    ((strcmp(agent->name, name) == 0) ||
                     (strcmp(name, TEST_ROLE_GRC) == 0))) {
+                       acquire_agent_blk(agent);
                        break;
+               } else {
+                       agent = NULL;
                }
        }
 
@@ -339,18 +404,6 @@ static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
        return agent;
 }
 
-lutf_agent_blk_t *find_agent_blk_by_name(char *name)
-{
-       lutf_agent_blk_t *agent;
-
-       agent = find_agent_blk_by_name_nl(name);
-       if (agent)
-               acquire_agent_blk(agent);
-
-       /* return the agent blk */
-       return agent;
-}
-
 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
 {
        lutf_agent_blk_t *agent;
@@ -367,9 +420,12 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
 
        for (i = 0; i < MAX_NUM_AGENTS; i++) {
                agent = agent_live_list[i];
-               if ((agent) && (agent->addr.sin_addr.s_addr ==
+               if ((agent) && agent_alive(agent) &&
+                   (agent->addr.sin_addr.s_addr ==
                                addr.sin_addr.s_addr))
                        break;
+               else
+                       agent = NULL;
        }
 
        if (agent)
@@ -382,39 +438,6 @@ lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
        return agent;
 }
 
-void agent_hb_check(struct timeval *t, lutf_type_t me)
-{
-       lutf_agent_blk_t *agent;
-       int i;
-
-       /* grab the array mutex */
-       MUTEX_LOCK(&agent_array_mutex);
-
-       for (i = 0; i < MAX_NUM_AGENTS; i++) {
-               agent = agent_live_list[i];
-               if (agent && agent->node_type != me) {
-                       acquire_agent_blk(agent);
-                       if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
-                               int agent_id = agent->id;
-                               /* agent didn't send a HB move to dead
-                                * list
-                                */
-                               PERROR("agent %s presumed dead", agent->name);
-                               release_agent_blk(agent);
-                               MUTEX_UNLOCK(&agent_array_mutex);
-                               /* free_agent_blk() grabs the mutex */
-                               free_agent_blk(agent_id);
-                               MUTEX_LOCK(&agent_array_mutex);
-                               continue;
-                       }
-                       release_agent_blk(agent);
-               }
-       }
-
-       /* release the array mutex */
-       MUTEX_UNLOCK(&agent_array_mutex);
-}
-
 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
 {
        struct timeval start;
@@ -438,12 +461,13 @@ lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
                PDEBUG("Waiting for Agents");
                while (cYAML_get_next_seq_item(agents, &a) != NULL) {
                        PDEBUG("Looking up: %s", a->cy_valuestring);
-                       if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) {
+                       agent = find_agent_blk_by_name(a->cy_valuestring);
+                       if (agent) {
+                               PDEBUG("agent %s found\n", agent->name);
+                               release_agent_blk(agent, false);
+                       } else {
                                found = false;
                                break;
-                       } else {
-                               PDEBUG("agent %s found\n", agent->name);
-                               release_agent_blk(agent);
                        }
                }
                if (!found)
@@ -484,7 +508,8 @@ int get_num_agents_remote(char *masterIP, int masterPort)
                                                htons(masterPort),
                                                false, false);
        if (remoteSocket < 0) {
-               PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket));
+               PERROR("establishTCPConnection failure: %s",
+                      lutf_rc2str(remoteSocket));
                rc = remoteSocket;
                goto out;
        }
@@ -547,7 +572,9 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
                       agent_blk->name,
                       inet_ntoa(agent_blk->addr.sin_addr),
                       agent_blk->listen_port);
-               /* in network byte order, convert so we can have a uniform API */
+               /* in network byte order, convert so we can have a
+                * uniform API
+                */
                agent_blk->iRpcFd = establishTCPConnection(
                                agent_blk->addr.sin_addr.s_addr,
                                htons(agent_blk->listen_port),
@@ -591,7 +618,8 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
                goto fail_rpc;
        }
 
-       rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout);
+       rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len),
+                           timeout);
        if (rc != EN_LUTF_RC_OK) {
                PERROR("Failed to recv rpc body in timeout %d", timeout);
                goto fail_rpc;
@@ -604,15 +632,15 @@ lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
         * appropriately.
         */
        *rsp = recvBuf;
-       release_agent_blk(agent_blk);
-
        unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+       release_agent_blk(agent_blk, false);
 
        return EN_LUTF_RC_OK;
 
 fail_rpc:
-       release_agent_blk(agent_blk);
        unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+       set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
+       release_agent_blk(agent_blk, true);
        if (recvBuf)
                free(recvBuf);
        msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
@@ -640,13 +668,14 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
        lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
        lutf_agent_blk_t *agent_blk;
        int msg_size;
+       bool dead = false;
 
        if (!agent || !yaml)
                goto out;
 
        msg_size = strlen(yaml) + 1;
 
-       agent_blk = find_agent_blk_by_name_nl(agent);
+       agent_blk = find_agent_blk_by_name(agent);
        if (!agent_blk) {
                PERROR("Can't find agent with name: %s", agent);
                goto out;
@@ -657,14 +686,21 @@ lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
                MUTEX_UNLOCK(&agent_blk->mutex);
                PERROR("agent_blk %s doesn't have an RPC channel",
                       agent_blk->name);
-               goto out;
+               goto release_agent;
        }
        MUTEX_UNLOCK(&agent_blk->mutex);
 
+       set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
        PDEBUG("sending rpc response\n%s", yaml);
        rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
                           EN_MSG_TYPE_RPC_RESPONSE);
-
+       if (rc)
+               dead = true;
+release_agent:
+       unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+       release_agent_blk(agent_blk, dead);
+       if (dead)
+               set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
 out:
        return rc;
 }