#include "lutf_listener.h"
static fd_set g_tAllSet;
-static bool g_bShutdown = false;
+static bool g_bShutdown;
static int g_iListenFd = INVALID_TCP_SOCKET;
-bool g_agent_enable_hb = true;
+static int g_iMaxSelectFd = INVALID_TCP_SOCKET;
typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
sServAddr.sin_port = htons(server_port);
- if (bind(g_iListenFd, (struct sockaddr *) &sServAddr, sizeof(sServAddr)) < 0) {
+ if (bind(g_iListenFd, (struct sockaddr *) &sServAddr,
+ sizeof(sServAddr)) < 0) {
/* Cannot bind our listening socket. */
closeTcpConnection(g_iListenFd);
return EN_LUTF_RC_BIND_FAILED;
}
- /* Let the system know we wish to listen to this port for
- * connections. */
+ /* Let the system know we wish to listen to this port for
+ * connections.
+ */
if (listen(g_iListenFd, 2) < 0) {
/* Cannot listen to socket, close and fail */
closeTcpConnection(g_iListenFd);
return EN_LUTF_RC_OK;
}
-static inline int close_agent_connection(lutf_agent_blk_t *agent)
-{
- FD_CLR(agent->iFileDesc, &g_tAllSet);
- FD_CLR(agent->iRpcFd, &g_tAllSet);
- closeTcpConnection(agent->iRpcFd);
- closeTcpConnection(agent->iFileDesc);
- agent->state &=
- ~LUTF_AGENT_RPC_CHANNEL_CONNECTED;
- agent->state &=
- ~LUTF_AGENT_HB_CHANNEL_CONNECTED;
- return get_highest_fd();
-}
-
lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
int type)
{
/* we assume the first connection is an HB connection */
if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
if (agent->iFileDesc != INVALID_TCP_SOCKET) {
- PERROR("agent in unexpected state. "
- "state is %s, but HB FD is %d",
+ PERROR("agent in unexpected state. state is %s, but HB FD is %d",
agent_state2str(agent), fd);
return EN_LUTF_RC_SYS_ERR;
} else {
}
} else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
if (agent->iRpcFd != INVALID_TCP_SOCKET) {
- PERROR("agent in unexpected state. "
- "state is %s, but RPC FD is %d",
+ PERROR("agent in unexpected state. state is %s, but RPC FD is %d",
agent_state2str(agent), fd);
return EN_LUTF_RC_SYS_ERR;
} else {
return EN_LUTF_RC_SYS_ERR;
}
+void close_agent_connection(lutf_agent_blk_t *agent)
+{
+ if (agent->iFileDesc != INVALID_TCP_SOCKET) {
+ FD_CLR(agent->iFileDesc, &g_tAllSet);
+ closeTcpConnection(agent->iFileDesc);
+ agent->iFileDesc = -1;
+ }
+ if (agent->iRpcFd != INVALID_TCP_SOCKET) {
+ FD_CLR(agent->iRpcFd, &g_tAllSet);
+ closeTcpConnection(agent->iRpcFd);
+ agent->iRpcFd = -1;
+ }
+ g_iMaxSelectFd = get_highest_fd();
+}
+
+void agent_hb_check(struct timeval *t, lutf_type_t me)
+{
+ lutf_agent_blk_t *agent;
+ int i = 0;
+
+ for (i = 0; i < MAX_NUM_AGENTS; i++) {
+ agent = find_agent_blk_by_id(i);
+
+ if (agent && agent->node_type != me) {
+ if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
+ /* agent didn't send a HB move to dead
+ * list
+ */
+ PERROR("agent %s presumed dead", agent->name);
+ release_agent_blk(agent, true);
+ continue;
+ }
+ }
+ if (agent)
+ release_agent_blk(agent, false);
+ }
+}
+
/*
* lutf_listener_main
* main loop. Listens for incoming agent connections, and for agent
socklen_t tCliLen;
fd_set tReadSet;
int iNReady;
- int iMaxSelectFd;
int i;
lutf_rc_t rc;
lutf_agent_blk_t *agent = NULL, *master = NULL;
agent_init();
- iMaxSelectFd = g_iListenFd;
-
- gettimeofday(&time_1, NULL);
+ g_iMaxSelectFd = g_iListenFd;
+ gettimeofday(&time_2, NULL);
- /* Main Processing Loop: Keep going until we have reason to shutdown. */
+ /* Main Processing Loop: Keep going until we have reason
+ * to shutdown.
+ */
while (!g_bShutdown) {
/* Wait on our select mask for an event to occur. */
- tReadSet = g_tAllSet;
-
select_to.tv_sec = HB_TO;
select_to.tv_usec = 0;
- iNReady = select(iMaxSelectFd + 1, &tReadSet, NULL, NULL, &select_to);
+ FD_ZERO(&tReadSet);
+ tReadSet = g_tAllSet;
+ iNReady = select(g_iMaxSelectFd + 1, &tReadSet, NULL, NULL,
+ &select_to);
+
+ release_dead_list_agents();
/* Determine if we failed the select call */
if (iNReady < 0) {
/* Check to see if we were interrupted by a signal. */
if ((errno == EINTR) || (errno == EAGAIN)) {
PERROR("Select failure: errno = %d", errno);
- } else {
- /* If this is an ECONNABORTED error, just ignore it. */
- if (errno != ECONNABORTED) {
- /* Raise a fatal alarm. */
- /* Shut down */
- PERROR("Shutting down Listener thread. errno: %d", errno);
- g_bShutdown = true;
- }
+ } else if (errno != ECONNABORTED) {
+ /* If this is an ECONNABORTED error, just
+ * ignore it. Raise a fatal alarm and shut
+ * down.
+ */
+ PERROR("Shutting down Listener thread. errno: %d",
+ errno);
+ lutf_listener_shutdown();
}
- } else {
- if (FD_ISSET(g_iListenFd, &tReadSet)) {
- /* A new client is trying to connect. */
- tCliLen = sizeof(sCliAddr);
- iConnFd = accept(g_iListenFd, (struct sockaddr *) &sCliAddr,
- &tCliLen);
- if (iConnFd < 0) {
- /* Cannot accept new connection...just ignore. */
- if (errno != EWOULDBLOCK)
- PERROR("Error on accept(), errno = %d", errno);
- } else {
- /* Try to see if we have an agent
- * with the same address, since
- * agents can have multiple tcp
- * connections open
- */
- agent = find_create_agent_blk_by_addr(&sCliAddr);
- if (!agent) {
- /* Cannot support more clients...just ignore. */
- PERROR("Cannot accept more clients");
- closeTcpConnection(iConnFd);
- } else {
- int iOption, iFlags;
-
- rc = complete_agent_connection(agent,
- iConnFd);
- if (rc != EN_LUTF_RC_OK) {
- int agent_id = agent->id;
- iMaxSelectFd = close_agent_connection(agent);
- release_agent_blk(agent);
- free_agent_blk(agent_id);
- continue;
- }
- /* all nodes listen on the
- * same port
- */
- agent->listen_port = info->listen_port;
-
- /* Add new client to our select mask. */
- FD_SET(iConnFd, &g_tAllSet);
- iMaxSelectFd = get_highest_fd();
-
- /* Ok, it seems that the connected socket gains
- * the same flags as the listen socket. We want
- * to make it blocking here.
- */
- iFlags = fcntl(iConnFd, F_GETFL, 0);
- fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
-
- /* And, we want to turn off Nagle's algorithm to
- * reduce latency
- */
- iOption = 1;
- setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
- (void *)&iOption,
- sizeof(iOption));
-
- PDEBUG("Received a connection from %s on FD %d\n",
- inet_ntoa(agent->addr.sin_addr), iConnFd);
-
- release_agent_blk(agent);
- }
- }
+ /* store the current time */
+ time_1 = time_2;
- /* See if there are other messages waiting. */
- iNReady--;
- }
+ /* Zero out the g_tAllSet */
+ FD_ZERO(&g_tAllSet);
- /* need to iterate through the clients and see if a
- * message was sent to any of them
- */
- for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
- /* reset the return code to avoid misbehaving on previous
- * returns
+ continue;
+ }
+
+ if (FD_ISSET(g_iListenFd, &tReadSet)) {
+ /* A new client is trying to connect. */
+ tCliLen = sizeof(sCliAddr);
+ iConnFd = accept(g_iListenFd,
+ (struct sockaddr *) &sCliAddr,
+ &tCliLen);
+ if (iConnFd < 0) {
+ /* Cannot accept new connection...
+ * just ignore.
*/
- rc = EN_LUTF_RC_OK;
+ if (errno != EWOULDBLOCK)
+ PERROR("Error on accept(), errno = %d",
+ errno);
+ } else {
+ /* Try to see if we have an agent
+ * with the same address, since
+ * agents can have multiple tcp
+ * connections open
+ */
+ agent = find_create_agent_blk_by_addr(&sCliAddr);
+ if (!agent) {
+ /* Cannot support more clients...just ignore. */
+ PERROR("Cannot accept more clients");
+ closeTcpConnection(iConnFd);
+ } else {
+ int iOption, iFlags;
- if ((agent = find_agent_blk_by_id(i))) {
- int hb_fd = INVALID_TCP_SOCKET;
- int rpc_fd = INVALID_TCP_SOCKET;
+ rc = complete_agent_connection(agent,
+ iConnFd);
+ if (rc != EN_LUTF_RC_OK) {
+ release_agent_blk(agent, true);
+ continue;
+ }
- if (FD_ISSET(agent->iFileDesc, &tReadSet))
- hb_fd = agent->iFileDesc;
- if (FD_ISSET(agent->iRpcFd, &tReadSet))
- rpc_fd = agent->iRpcFd;
+ /* all nodes listen on the
+ * same port
+ */
+ agent->listen_port = info->listen_port;
- if (hb_fd == INVALID_TCP_SOCKET &&
- rpc_fd == INVALID_TCP_SOCKET)
- continue;
+ /* Add new client to our select mask. */
+ FD_SET(iConnFd, &g_tAllSet);
+ g_iMaxSelectFd = get_highest_fd();
- /* process heart beat */
- if (hb_fd != INVALID_TCP_SOCKET) {
- /* process the message */
- rc = process_agent_message(agent, hb_fd);
- if (rc)
- PERROR("msg failure: %s",
- lutf_rc2str(rc));
- }
- if (rc == EN_LUTF_RC_SOCKET_FAIL) {
- int agent_id = agent->id;
- if (agent->id == master->id) {
- PERROR("Disconnected from master. Will attempt to reconnect");
- master_connected = false;
- }
- iMaxSelectFd = close_agent_connection(agent);
- release_agent_blk(agent);
- free_agent_blk(agent_id);
- continue;
- }
+ /* Ok, it seems that the connected socket gains
+ * the same flags as the listen socket. We want
+ * to make it blocking here.
+ */
+ iFlags = fcntl(iConnFd, F_GETFL, 0);
+ fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
- /* process rpc */
- if (rpc_fd != INVALID_TCP_SOCKET) {
- /* process the message */
- rc = process_agent_message(agent, rpc_fd);
- if (rc)
- PERROR("msg failure: %s",
- lutf_rc2str(rc));
- }
- if (rc == EN_LUTF_RC_SOCKET_FAIL) {
- int agent_id = agent->id;
- if (agent->id == master->id) {
- PERROR("Disconnected from master. Will attempt to reconnect");
- master_connected = false;
- }
- iMaxSelectFd = close_agent_connection(agent);
- release_agent_blk(agent);
- free_agent_blk(agent_id);
- continue;
- }
- release_agent_blk(agent);
+ /* And, we want to turn off Nagle's algorithm to
+ * reduce latency
+ */
+ iOption = 1;
+ setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
+ (void *)&iOption,
+ sizeof(iOption));
+
+ PDEBUG("Received a connection from %s on FD %d\n",
+ inet_ntoa(agent->addr.sin_addr), iConnFd);
}
}
- /* establish connection with the master if I'm an agent
- * and I have not connected to the master yet.
- * Otherwise send a heart beat
+ /* See if there are other messages waiting. */
+ iNReady--;
+ }
+
+ /* need to iterate through the clients and see if a
+ * message was sent to any of them
+ */
+ for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
+ /* reset the return code to avoid misbehaving on previous
+ * returns
*/
- if (!master_connected &&
- strlen(g_lutf_cfg.master_name) != 0) {
- PDEBUG("Attempting a connection on master %s",
- g_lutf_cfg.master_name);
- master = find_free_agent_blk(&info->hb_info.master_address);
- if (!master) {
- PERROR("Failed to allocate agent block");
+ rc = EN_LUTF_RC_OK;
+
+ if ((agent = find_agent_blk_by_id(i))) {
+ int hb_fd = INVALID_TCP_SOCKET;
+ int rpc_fd = INVALID_TCP_SOCKET;
+
+ release_agent_blk(agent, false);
+
+ if (FD_ISSET(agent->iFileDesc, &tReadSet))
+ hb_fd = agent->iFileDesc;
+ if (FD_ISSET(agent->iRpcFd, &tReadSet))
+ rpc_fd = agent->iRpcFd;
+
+ if (hb_fd == INVALID_TCP_SOCKET &&
+ rpc_fd == INVALID_TCP_SOCKET)
continue;
- }
- iConnFd = establishTCPConnection(
- info->hb_info.master_address.sin_addr.s_addr,
- htons(info->hb_info.master_address.sin_port),
- true, false);
-
- if (iConnFd < 0) {
- int master_id = master->id;
-
- PERROR("establishTCPConnection failure: %s. Clearing set",
- lutf_rc2str(iConnFd));
- iMaxSelectFd = close_agent_connection(master);
- release_agent_blk(master);
- free_agent_blk(master_id);
- PERROR("Disconnected from master. Will attempt to reconnect");
- master_connected = false;
+ /* process heart beat */
+ if (hb_fd != INVALID_TCP_SOCKET) {
+ /* process the message */
+ rc = process_agent_message(agent, hb_fd);
+ if (rc)
+ PERROR("msg failure: %s",
+ lutf_rc2str(rc));
+ }
+ if (rc == EN_LUTF_RC_SOCKET_FAIL) {
+ if (agent->id == master->id) {
+ PERROR("Disconnected from master. Will attempt to reconnect");
+ master_connected = false;
+ }
+ release_agent_blk(agent, true);
continue;
}
- master->iFileDesc = iConnFd;
- memcpy(&master->addr,
- &info->hb_info.master_address,
- sizeof(master->addr));
- strncpy(master->name, g_lutf_cfg.master_name,
- MAX_STR_LEN);
- master->name[MAX_STR_LEN-1] = '\0';
- master->node_type = EN_LUTF_MASTER;
- gethostname(master->hostname, MAX_STR_LEN);
- master->telnet_port = info->hb_info.agent_telnet_port;
- release_agent_blk(master);
-
- PDEBUG("Connected to master %s on fd %d",
- master->name, master->iFileDesc);
-
- /*
- * add the master FD to the select FD set
- * to be able to process master messages
- */
- FD_SET(iConnFd, &g_tAllSet);
- iMaxSelectFd = get_highest_fd();
+ /* process rpc */
+ if (rpc_fd != INVALID_TCP_SOCKET) {
+ /* process the message */
+ rc = process_agent_message(agent, rpc_fd);
+ if (rc)
+ PERROR("msg failure: %s",
+ lutf_rc2str(rc));
+ }
+ if (rc == EN_LUTF_RC_SOCKET_FAIL) {
+ if (agent->id == master->id) {
+ PERROR("Disconnected from master. Will attempt to reconnect");
+ master_connected = false;
+ }
+ release_agent_blk(agent, true);
+ continue;
+ }
+ }
+ }
- master_connected = true;
- master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
+ /* establish connection with the master if I'm an agent
+ * and I have not connected to the master yet.
+ * Otherwise send a heart beat
+ */
+ if (!master_connected &&
+ strlen(g_lutf_cfg.master_name) != 0) {
+ PDEBUG("Attempting a connection on master %s",
+ g_lutf_cfg.master_name);
+ master = find_free_agent_blk(&info->hb_info.master_address);
+ if (!master) {
+ PERROR("Failed to allocate agent block");
+ continue;
}
-/*
- if (info->type == EN_LUTF_AGENT) {
- rc = send_hb(master, info->hb_info.node_name,
- info->hb_info.agent_telnet_port,
- info->type);
- if (rc != EN_LUTF_RC_OK) {
- master_connected = false;
- iMaxSelectFd = get_highest_fd();
- }
+
+ iConnFd = establishTCPConnection(info->hb_info.master_address.sin_addr.s_addr,
+ htons(info->hb_info.master_address.sin_port),
+ true, false);
+ if (iConnFd < 0) {
+ PERROR("establishTCPConnection failure: %s. Clearing set",
+ lutf_rc2str(iConnFd));
+ release_agent_blk(master, true);
+ PERROR("Disconnected from master. Will attempt to reconnect");
+ master_connected = false;
+ continue;
}
-*/
+
+ master->iFileDesc = iConnFd;
+ memcpy(&master->addr,
+ &info->hb_info.master_address,
+ sizeof(master->addr));
+ strncpy(master->name, g_lutf_cfg.master_name,
+ MAX_STR_LEN);
+ master->name[MAX_STR_LEN-1] = '\0';
+ master->node_type = EN_LUTF_MASTER;
+ gethostname(master->hostname, MAX_STR_LEN);
+ master->telnet_port = info->hb_info.agent_telnet_port;
+
+ PDEBUG("Connected to master %s on fd %d",
+ master->name, master->iFileDesc);
+
/*
- * Get the time stamp and go through each agent
- * and see if it's still healthy. For agents which
- * aren't healthy move off to the dead_list.
- * This operation is only valid if I'm a master
+ * add the master FD to the select FD set
+ * to be able to process master messages
*/
- gettimeofday(&time_2, NULL);
- if (g_agent_enable_hb && info->type == EN_LUTF_MASTER) {
- /* check if HB_TO seconds has passed since the last
- * time we collected the time */
- if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
-
- /* do the heartbeat check */
- agent_hb_check(&time_1, info->type);
- }
+ FD_SET(iConnFd, &g_tAllSet);
+ g_iMaxSelectFd = get_highest_fd();
+
+ master_connected = true;
+ master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
+ }
+ /*
+ if (info->type == EN_LUTF_AGENT) {
+ rc = send_hb(master, info->hb_info.node_name,
+ info->hb_info.agent_telnet_port,
+ info->type);
+ if (rc != EN_LUTF_RC_OK) {
+ master_connected = false;
+ g_iMaxSelectFd = get_highest_fd();
}
+ }
+ */
+ /*
+ * Get the time stamp and go through each agent
+ * and see if it's still healthy. For agents which
+ * aren't healthy move off to the dead_list.
+ * This operation is only valid if I'm a master
+ */
+ gettimeofday(&time_2, NULL);
+ if (agent_get_hb() && info->type == EN_LUTF_MASTER) {
+ /* check if HB_TO seconds has passed since the last
+ * time we collected the time
+ */
+ if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
+ /* do the heartbeat check */
+ agent_hb_check(&time_1, info->type);
+ }
+ }
+
+ if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
+ lutf_agent_blk_t *agent = NULL;
+ int idx = 0;
+
+ do {
+ idx = get_next_active_agent(idx, &agent);
+ /* A master doesn't send a heart
+ * beat to himself
+ */
+ if (agent) {
+ bool dead = false;
+ if (info->type == EN_LUTF_MASTER &&
+ agent->id == master->id)
+ continue;
- if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
- lutf_agent_blk_t *agent = NULL;
- int idx = 0;
-
- do {
- idx = get_next_active_agent(idx, &agent);
- /* A master doesn't send a heart
- * beat to himself */
- if (agent) {
- if (info->type == EN_LUTF_MASTER &&
- agent->id == master->id)
- continue;
- int agent_id = agent->id;
- rc = send_hb(agent, info->hb_info.node_name,
- info->hb_info.agent_telnet_port,
- info->type);
- if (rc != EN_LUTF_RC_OK) {
- if (agent->id == master->id) {
- PERROR("Disconnected from master. Will attempt to reconnect");
- master_connected = false;
- }
- iMaxSelectFd = close_agent_connection(agent);
- release_agent_blk(agent);
- free_agent_blk(agent_id);
- } else {
- release_agent_blk(agent);
+ rc = send_hb(agent, info->hb_info.node_name,
+ info->hb_info.agent_telnet_port,
+ info->type);
+ if (rc != EN_LUTF_RC_OK) {
+ if (agent->id == master->id) {
+ PERROR("Disconnected from master. Will attempt to reconnect");
+ master_connected = false;
}
+ dead = true;
}
- } while (agent);
- }
+ release_agent_blk(agent, dead);
+ }
+ } while (agent);
}
+
/* store the current time */
- time_1 = time_2;
+ memcpy(&time_1, &time_2, sizeof(time_1));
}
/* Zero out the g_tAllSet */