#include "lutf_agent.h"
#include "lutf.h"
#include "lutf_python.h"
+#include "lutf_listener.h"
static pthread_mutex_t agent_array_mutex;
static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
+static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS];
/* TODO: this is probably not thread safe */
static char agent_state_str[128];
-extern bool g_agent_enable_hb;
-extern struct in_addr g_local_ip;
+static bool g_agent_enable_hb = true;
+static struct in_addr g_local_ip;
#define DEFAULT_RPC_RSP "rpc:\n src: %s\n dst: %s\n type: failure\n"
return inet_ntoa(g_local_ip);
}
-void release_agent_blk(lutf_agent_blk_t *agent)
+static void insert_dead_agent_locked(lutf_agent_blk_t *agent)
{
- /* release the agent blk mutex */
- MUTEX_LOCK(&agent->mutex);
- if (agent) {
- assert(agent->ref_count != 0);
- agent->ref_count--;
+ int i = 0;
+
+ for (i = 0; i < MAX_NUM_AGENTS; i++) {
+ if (agent_dead_list[i] == NULL) {
+ agent->state |= LUTF_AGENT_STATE_DEAD;
+ agent_dead_list[i] = agent;
+ agent->id = i;
+ break;
+ }
+ }
+ assert(i < MAX_NUM_AGENTS);
+}
+
+static void del_dead_agent_locked(lutf_agent_blk_t *agent)
+{
+ assert(agent &&
+ agent->state & LUTF_AGENT_STATE_DEAD &&
+ agent_dead_list[agent->id] != NULL &&
+ agent_dead_list[agent->id] == agent);
+
+ assert(agent->ref_count > 0);
+ agent->ref_count--;
+
+ if (agent->ref_count == 0) {
+ agent_dead_list[agent->id] = NULL;
+ memset(agent, 0xdeadbeef, sizeof(*agent));
+ free(agent);
}
+}
+
+void release_dead_list_agents(void)
+{
+ int i;
+
+ MUTEX_LOCK(&agent_array_mutex);
+ for (i = 0; i < MAX_NUM_AGENTS; i++) {
+ lutf_agent_blk_t *agent;
+
+ agent = agent_dead_list[i];
+
+ if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) {
+ agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN;
+ del_dead_agent_locked(agent);
+ }
+ }
+ MUTEX_UNLOCK(&agent_array_mutex);
+}
+
+static inline bool agent_alive(lutf_agent_blk_t *agent)
+{
+ bool viable = false;
+
+ MUTEX_LOCK(&agent->mutex);
+ if (agent->state & LUTF_AGENT_STATE_ALIVE)
+ viable = true;
MUTEX_UNLOCK(&agent->mutex);
+
+ return viable;
+}
+
+void release_agent_blk(lutf_agent_blk_t *agent, int dead)
+{
+ assert(agent);
+
+ MUTEX_LOCK(&agent_array_mutex);
+ MUTEX_LOCK(&agent->mutex);
+
+ if (agent->state & LUTF_AGENT_STATE_ALIVE) {
+ /* sanity check */
+ assert(agent_live_list[agent->id] != NULL &&
+ agent_live_list[agent->id] == agent);
+ } else {
+ MUTEX_UNLOCK(&agent->mutex);
+ del_dead_agent_locked(agent);
+ MUTEX_UNLOCK(&agent_array_mutex);
+ return;
+ }
+
+ assert(agent->ref_count > 0);
+ agent->ref_count--;
+
+ if (agent->ref_count == 0) {
+ agent_live_list[agent->id] = NULL;
+ assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS));
+ MUTEX_UNLOCK(&agent->mutex);
+ MUTEX_UNLOCK(&agent_array_mutex);
+ close_agent_connection(agent);
+ memset(agent, 0xdeadbeef, sizeof(*agent));
+ /* free the block */
+ free(agent);
+ } else if (dead) {
+ agent_live_list[agent->id] = NULL;
+ insert_dead_agent_locked(agent);
+ MUTEX_UNLOCK(&agent->mutex);
+ MUTEX_UNLOCK(&agent_array_mutex);
+ unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
+ unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED);
+ unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED);
+ close_agent_connection(agent);
+ } else {
+ MUTEX_UNLOCK(&agent->mutex);
+ MUTEX_UNLOCK(&agent_array_mutex);
+ }
}
void acquire_agent_blk(lutf_agent_blk_t *agent)
return agent_state_str;
}
-static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
- struct sockaddr_in *addr)
+static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr)
{
int i;
lutf_agent_blk_t *agent;
MUTEX_LOCK(&agent_array_mutex);
for (i = 0; i < MAX_NUM_AGENTS; i++) {
- agent = list[i];
- if ((agent) &&
+ agent = agent_live_list[i];
+ if ((agent) && agent_alive(agent) &&
(agent->addr.sin_addr.s_addr ==
addr->sin_addr.s_addr)) {
+ acquire_agent_blk(agent);
MUTEX_UNLOCK(&agent_array_mutex);
return agent;
}
MUTEX_LOCK(&agent_array_mutex);
for (i = idx; i < MAX_NUM_AGENTS; i++) {
agent = agent_live_list[i];
- if (agent) {
+ if (agent && agent_alive(agent)) {
i++;
acquire_agent_blk(agent);
break;
lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
{
lutf_agent_blk_t *agent;
- agent = find_agent_blk_by_addr(agent_live_list, addr);
+
+ agent = find_agent_blk_by_addr(addr);
if (!agent)
return find_free_agent_blk(addr);
-
- MUTEX_LOCK(&agent_array_mutex);
- acquire_agent_blk(agent);
- MUTEX_UNLOCK(&agent_array_mutex);
+ release_agent_blk(agent, false);
return agent;
}
g_agent_enable_hb = true;
}
+int agent_get_hb(void)
+{
+ return g_agent_enable_hb;
+}
+
lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
{
int i = 0;
MUTEX_LOCK(&agent_array_mutex);
/* iterate through the array to find a free entry */
- while (agent_live_list[i] != NULL)
- i++;
+ for (i = 0; i < MAX_NUM_AGENTS; i++) {
+ if (agent_live_list[i] == NULL)
+ break;
+ }
if (i >= MAX_NUM_AGENTS) {
MUTEX_UNLOCK(&agent_array_mutex);
agent = agent_live_list[idx];
- acquire_agent_blk(agent);
+ if (agent_alive(agent))
+ acquire_agent_blk(agent);
+ else
+ agent = NULL;
/* release the array mutex */
MUTEX_UNLOCK(&agent_array_mutex);
void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
{
- bool zombie = false;
-
MUTEX_LOCK(&agent->mutex);
agent->state &= ~state;
- if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) &&
- (agent->state & LUTF_AGENT_ZOMBIE))
- zombie = true;
- MUTEX_UNLOCK(&agent->mutex);
-
- if (zombie)
- free_agent_blk(agent->id);
-}
-
-void free_agent_blk(int id)
-{
- lutf_agent_blk_t *agent;
-
- /* grab the array mutex */
- MUTEX_LOCK(&agent_array_mutex);
-
- /* if the blk is non null grab the mutex.
- * possibly block until previous user is done
- */
- if (agent_live_list[id] == NULL) {
- MUTEX_UNLOCK(&agent_array_mutex);
- return;
- }
-
- agent = agent_live_list[id];
-
- MUTEX_LOCK(&agent->mutex);
- if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) {
- MUTEX_UNLOCK(&agent->mutex);
- MUTEX_UNLOCK(&agent_array_mutex);
- PDEBUG("delay deleting agent %s\n", agent->name);
- set_agent_state(agent, LUTF_AGENT_ZOMBIE);
- return;
- }
MUTEX_UNLOCK(&agent->mutex);
-
- agent_live_list[id] = NULL;
-
- /* release the array mutex */
- MUTEX_UNLOCK(&agent_array_mutex);
-
- /* free the block */
- free(agent);
}
char *agent_ip2str(lutf_agent_blk_t *agent)
{
int i;
int num = 0;
+
for (i = 0; i < MAX_NUM_AGENTS; i++) {
if (agent_live_list[i] != NULL)
num++;
return num;
}
-/* no lock version of the function */
-static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
+lutf_agent_blk_t *find_agent_blk_by_name(char *name)
{
lutf_agent_blk_t *agent;
int i;
for (i = 0; i < MAX_NUM_AGENTS; i++) {
agent = agent_live_list[i];
- if ((agent) &&
+ if ((agent) && agent_alive(agent) &&
((strcmp(agent->name, name) == 0) ||
(strcmp(name, TEST_ROLE_GRC) == 0))) {
+ acquire_agent_blk(agent);
break;
+ } else {
+ agent = NULL;
}
}
return agent;
}
-lutf_agent_blk_t *find_agent_blk_by_name(char *name)
-{
- lutf_agent_blk_t *agent;
-
- agent = find_agent_blk_by_name_nl(name);
- if (agent)
- acquire_agent_blk(agent);
-
- /* return the agent blk */
- return agent;
-}
-
lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
{
lutf_agent_blk_t *agent;
for (i = 0; i < MAX_NUM_AGENTS; i++) {
agent = agent_live_list[i];
- if ((agent) && (agent->addr.sin_addr.s_addr ==
+ if ((agent) && agent_alive(agent) &&
+ (agent->addr.sin_addr.s_addr ==
addr.sin_addr.s_addr))
break;
+ else
+ agent = NULL;
}
if (agent)
return agent;
}
-void agent_hb_check(struct timeval *t, lutf_type_t me)
-{
- lutf_agent_blk_t *agent;
- int i;
-
- /* grab the array mutex */
- MUTEX_LOCK(&agent_array_mutex);
-
- for (i = 0; i < MAX_NUM_AGENTS; i++) {
- agent = agent_live_list[i];
- if (agent && agent->node_type != me) {
- acquire_agent_blk(agent);
- if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
- int agent_id = agent->id;
- /* agent didn't send a HB move to dead
- * list
- */
- PERROR("agent %s presumed dead", agent->name);
- release_agent_blk(agent);
- MUTEX_UNLOCK(&agent_array_mutex);
- /* free_agent_blk() grabs the mutex */
- free_agent_blk(agent_id);
- MUTEX_LOCK(&agent_array_mutex);
- continue;
- }
- release_agent_blk(agent);
- }
- }
-
- /* release the array mutex */
- MUTEX_UNLOCK(&agent_array_mutex);
-}
-
lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
{
struct timeval start;
PDEBUG("Waiting for Agents");
while (cYAML_get_next_seq_item(agents, &a) != NULL) {
PDEBUG("Looking up: %s", a->cy_valuestring);
- if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) {
+ agent = find_agent_blk_by_name(a->cy_valuestring);
+ if (agent) {
+ PDEBUG("agent %s found\n", agent->name);
+ release_agent_blk(agent, false);
+ } else {
found = false;
break;
- } else {
- PDEBUG("agent %s found\n", agent->name);
- release_agent_blk(agent);
}
}
if (!found)
htons(masterPort),
false, false);
if (remoteSocket < 0) {
- PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket));
+ PERROR("establishTCPConnection failure: %s",
+ lutf_rc2str(remoteSocket));
rc = remoteSocket;
goto out;
}
agent_blk->name,
inet_ntoa(agent_blk->addr.sin_addr),
agent_blk->listen_port);
- /* in network byte order, convert so we can have a uniform API */
+ /* in network byte order, convert so we can have a
+ * uniform API
+ */
agent_blk->iRpcFd = establishTCPConnection(
agent_blk->addr.sin_addr.s_addr,
htons(agent_blk->listen_port),
goto fail_rpc;
}
- rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout);
+ rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len),
+ timeout);
if (rc != EN_LUTF_RC_OK) {
PERROR("Failed to recv rpc body in timeout %d", timeout);
goto fail_rpc;
* appropriately.
*/
*rsp = recvBuf;
- release_agent_blk(agent_blk);
-
unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+ release_agent_blk(agent_blk, false);
return EN_LUTF_RC_OK;
fail_rpc:
- release_agent_blk(agent_blk);
unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+ set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
+ release_agent_blk(agent_blk, true);
if (recvBuf)
free(recvBuf);
msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
lutf_agent_blk_t *agent_blk;
int msg_size;
+ bool dead = false;
if (!agent || !yaml)
goto out;
msg_size = strlen(yaml) + 1;
- agent_blk = find_agent_blk_by_name_nl(agent);
+ agent_blk = find_agent_blk_by_name(agent);
if (!agent_blk) {
PERROR("Can't find agent with name: %s", agent);
goto out;
MUTEX_UNLOCK(&agent_blk->mutex);
PERROR("agent_blk %s doesn't have an RPC channel",
agent_blk->name);
- goto out;
+ goto release_agent;
}
MUTEX_UNLOCK(&agent_blk->mutex);
+ set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
PDEBUG("sending rpc response\n%s", yaml);
rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
EN_MSG_TYPE_RPC_RESPONSE);
-
+ if (rc)
+ dead = true;
+release_agent:
+ unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
+ release_agent_blk(agent_blk, dead);
+ if (dead)
+ set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
out:
return rc;
}