6 #include <sys/socket.h>
7 #include <netinet/in.h>
8 #include <netinet/tcp.h>
15 #include "lutf_python.h"
16 #include "lutf_agent.h"
17 #include "lutf_message.h"
18 #include "lutf_listener.h"
20 static fd_set g_tAllSet;
21 static bool g_bShutdown;
22 static int g_iListenFd = INVALID_TCP_SOCKET;
23 static int g_iMaxSelectFd = INVALID_TCP_SOCKET;
25 typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
27 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent);
28 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent);
29 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent);
31 static msg_process_fn_t msg_process_tbl[EN_MSG_TYPE_MAX] = {
32 [EN_MSG_TYPE_HB] = process_msg_hb,
33 [EN_MSG_TYPE_GET_NUM_AGENTS] = process_msg_get_num_agents,
34 [EN_MSG_TYPE_RPC_REQUEST] = process_msg_rpc_request,
37 void lutf_listener_shutdown(void)
42 int get_highest_fd(void)
44 int iAgentFd = lutf_agent_get_highest_fd();
47 if (iAgentFd > g_iListenFd)
51 PDEBUG("Current highest FD = %d", iMaxFd);
56 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent)
60 agent->state |= LUTF_AGENT_WORK_IN_PROGRESS;
61 rc = python_handle_rpc_request(msg);
62 agent->state &= ~LUTF_AGENT_WORK_IN_PROGRESS;
67 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent)
69 lutf_msg_hb_t *hb = (lutf_msg_hb_t *)msg;
70 //PERROR("Procesing HB message");
72 /* endian convert message */
73 hb->telnet_port = ntohl(hb->telnet_port);
74 hb->node_type = ntohl(hb->node_type);
76 /* update the agent with the information */
77 agent->telnet_port = hb->telnet_port;
78 agent->node_type = hb->node_type;
79 strncpy(agent->hostname, hb->node_hostname, MAX_STR_LEN);
80 agent->hostname[MAX_STR_LEN-1] = '\0';
81 strncpy(agent->name, hb->node_name, MAX_STR_LEN);
82 agent->name[MAX_STR_LEN-1] = '\0';
83 gettimeofday(&agent->time_stamp, NULL);
88 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent)
91 lutf_msg_num_agents_query_t query;
93 query.num_agents = get_num_agents();
94 rc = sendTcpMessage(agent->iFileDesc, (char *)&query, sizeof(query));
96 PERROR("failed to send tcp message to get num agents query");
100 return EN_LUTF_RC_OK;
103 static lutf_rc_t process_agent_message(lutf_agent_blk_t *agent, int fd)
105 lutf_rc_t rc = EN_LUTF_RC_OK;
106 lutf_message_hdr_t hdr;
108 msg_process_fn_t proc_fn;
110 /* get the header first */
111 rc = readTcpMessage(fd, (char *)&hdr, sizeof(hdr),
112 TCP_READ_TIMEOUT_SEC);
117 hdr.version = ntohl(hdr.version);
118 if (hdr.version != LUTF_VERSION_NUMBER) {
119 PERROR("version %d != %d", hdr.version,
120 LUTF_VERSION_NUMBER);
121 return EN_LUTF_RC_BAD_VERSION;
124 /* if the ips don't match ignore the message */
125 if (memcmp(&agent->addr.sin_addr, &hdr.ip, sizeof(hdr.ip)))
128 hdr.type = ntohl(hdr.type);
129 hdr.len = ntohl(hdr.len);
131 buffer = calloc(hdr.len, 1);
133 return EN_LUTF_RC_OOM;
135 /* get the rest of the message */
136 rc = readTcpMessage(fd, buffer, hdr.len,
137 TCP_READ_TIMEOUT_SEC);
144 /* call the appropriate processing function */
145 proc_fn = msg_process_tbl[hdr.type];
147 rc = proc_fn(buffer, agent);
153 static lutf_rc_t init_comm(unsigned short server_port)
156 struct sockaddr_in sServAddr;
158 signal(SIGPIPE, SIG_IGN);
160 /* Create a socket to listen to. */
161 g_iListenFd = socket(AF_INET, SOCK_STREAM, 0);
162 if (g_iListenFd < 0) {
163 /* Cannot create a listening socket. */
164 return EN_LUTF_RC_SOCKET_FAIL;
167 /* Set a socket option which will allow us to be quickly restarted
171 if (setsockopt(g_iListenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &iFlags,
172 sizeof(iFlags)) < 0) {
173 /* Cannot change the socket options. */
174 closeTcpConnection(g_iListenFd);
175 return EN_LUTF_RC_FAIL;
178 /* Bind to our listening socket. */
179 bzero((char *) &sServAddr, sizeof(sServAddr));
180 sServAddr.sin_family = AF_INET;
181 sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
182 sServAddr.sin_port = htons(server_port);
184 if (bind(g_iListenFd, (struct sockaddr *) &sServAddr,
185 sizeof(sServAddr)) < 0) {
186 /* Cannot bind our listening socket. */
187 closeTcpConnection(g_iListenFd);
188 return EN_LUTF_RC_BIND_FAILED;
191 /* Let the system know we wish to listen to this port for
194 if (listen(g_iListenFd, 2) < 0) {
195 /* Cannot listen to socket, close and fail */
196 closeTcpConnection(g_iListenFd);
197 return EN_LUTF_RC_LISTEN_FAILED;
200 /* We want this socket to be non-blocking even though it will be used
201 * in a blocking select call. This is to avoid a problem identified by
204 iFlags = fcntl(g_iListenFd, F_GETFL, 0);
205 fcntl(g_iListenFd, F_SETFL, iFlags | O_NONBLOCK);
207 /* Add the listening socket to our select() mask. */
209 FD_SET(g_iListenFd, &g_tAllSet);
211 return EN_LUTF_RC_OK;
214 lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
220 hb.telnet_port = htonl(telnet_port);
221 hb.node_type = htonl(type);
222 strncpy(hb.node_name, name, MAX_STR_LEN);
223 hb.node_name[MAX_STR_LEN-1] = '\0';
224 gethostname(hb.node_hostname, MAX_STR_LEN);
226 /* send the heart beat */
227 rc = lutf_send_msg(agent->iFileDesc, (char *)&hb,
228 sizeof(hb), EN_MSG_TYPE_HB);
229 if (rc != EN_LUTF_RC_OK) {
230 PERROR("Failed to send heart beat %s\n",
237 lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
239 /* we assume the first connection is an HB connection */
240 if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
241 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
242 PERROR("agent in unexpected state. state is %s, but HB FD is %d",
243 agent_state2str(agent), fd);
244 return EN_LUTF_RC_SYS_ERR;
246 PDEBUG("HB Channel Connected: %d", fd);
247 agent->iFileDesc = fd;
248 agent->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
249 return EN_LUTF_RC_OK;
251 } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
252 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
253 PERROR("agent in unexpected state. state is %s, but RPC FD is %d",
254 agent_state2str(agent), fd);
255 return EN_LUTF_RC_SYS_ERR;
257 PDEBUG("RPC Channel Connected: %d", fd);
259 agent->state |= LUTF_AGENT_RPC_CHANNEL_CONNECTED;
260 return EN_LUTF_RC_OK;
264 PERROR("agent is in an unexpected state on connection %s",
265 agent_state2str(agent));
266 return EN_LUTF_RC_SYS_ERR;
269 void close_agent_connection(lutf_agent_blk_t *agent)
271 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
272 FD_CLR(agent->iFileDesc, &g_tAllSet);
273 closeTcpConnection(agent->iFileDesc);
274 agent->iFileDesc = -1;
276 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
277 FD_CLR(agent->iRpcFd, &g_tAllSet);
278 closeTcpConnection(agent->iRpcFd);
281 g_iMaxSelectFd = get_highest_fd();
284 void agent_hb_check(struct timeval *t, lutf_type_t me)
286 lutf_agent_blk_t *agent;
289 for (i = 0; i < MAX_NUM_AGENTS; i++) {
290 agent = find_agent_blk_by_id(i);
292 if (agent && agent->node_type != me) {
293 if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
294 /* agent didn't send a HB move to dead
297 PERROR("agent %s presumed dead", agent->name);
298 release_agent_blk(agent, true);
303 release_agent_blk(agent, false);
309 * main loop. Listens for incoming agent connections, and for agent
310 * messages. Every period of time it triggers a walk through the agent
311 * list to see if any of the HBs stopped
313 * If I am an Agent, then attempt to connect to the master and add an
314 * agent block on the list of agents. After successful connection send
315 * a regular heart beat.
317 * Since the master's agent block is on the list of agents and its FD is
318 * on the select FD set, then if the master sends the agent a message
319 * the agent should be able to process it.
321 void *lutf_listener_main(void *usr_data)
324 struct sockaddr_in sCliAddr;
330 lutf_agent_blk_t *agent = NULL, *master = NULL;
331 struct timeval time_1, time_2, select_to;
332 lutf_listener_info_t *info;
333 bool master_connected = false;
335 info = (lutf_listener_info_t *)usr_data;
337 ((info) && (info->listen_port == 0))) {
338 PERROR("No liston port provided");
342 rc = init_comm(info->listen_port);
344 PERROR("init_comm failed: %s", lutf_rc2str(rc));
350 g_iMaxSelectFd = g_iListenFd;
352 gettimeofday(&time_2, NULL);
354 /* Main Processing Loop: Keep going until we have reason
357 while (!g_bShutdown) {
358 /* Wait on our select mask for an event to occur. */
359 select_to.tv_sec = HB_TO;
360 select_to.tv_usec = 0;
363 tReadSet = g_tAllSet;
364 iNReady = select(g_iMaxSelectFd + 1, &tReadSet, NULL, NULL,
367 release_dead_list_agents();
369 /* Determine if we failed the select call */
371 /* Check to see if we were interrupted by a signal. */
372 if ((errno == EINTR) || (errno == EAGAIN)) {
373 PERROR("Select failure: errno = %d", errno);
374 } else if (errno != ECONNABORTED) {
375 /* If this is an ECONNABORTED error, just
376 * ignore it. Raise a fatal alarm and shut
379 PERROR("Shutting down Listener thread. errno: %d",
381 lutf_listener_shutdown();
384 /* store the current time */
387 /* Zero out the g_tAllSet */
393 if (FD_ISSET(g_iListenFd, &tReadSet)) {
394 /* A new client is trying to connect. */
395 tCliLen = sizeof(sCliAddr);
396 iConnFd = accept(g_iListenFd,
397 (struct sockaddr *) &sCliAddr,
400 /* Cannot accept new connection...
403 if (errno != EWOULDBLOCK)
404 PERROR("Error on accept(), errno = %d",
407 /* Try to see if we have an agent
408 * with the same address, since
409 * agents can have multiple tcp
412 agent = find_create_agent_blk_by_addr(&sCliAddr);
414 /* Cannot support more clients...just ignore. */
415 PERROR("Cannot accept more clients");
416 closeTcpConnection(iConnFd);
420 rc = complete_agent_connection(agent,
422 if (rc != EN_LUTF_RC_OK) {
423 release_agent_blk(agent, true);
427 /* all nodes listen on the
430 agent->listen_port = info->listen_port;
432 /* Add new client to our select mask. */
433 FD_SET(iConnFd, &g_tAllSet);
434 g_iMaxSelectFd = get_highest_fd();
436 /* Ok, it seems that the connected socket gains
437 * the same flags as the listen socket. We want
438 * to make it blocking here.
440 iFlags = fcntl(iConnFd, F_GETFL, 0);
441 fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
443 /* And, we want to turn off Nagle's algorithm to
447 setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
451 PDEBUG("Received a connection from %s on FD %d\n",
452 inet_ntoa(agent->addr.sin_addr), iConnFd);
456 /* See if there are other messages waiting. */
460 /* need to iterate through the clients and see if a
461 * message was sent to any of them
463 for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
464 /* reset the return code to avoid misbehaving on previous
469 if ((agent = find_agent_blk_by_id(i))) {
470 int hb_fd = INVALID_TCP_SOCKET;
471 int rpc_fd = INVALID_TCP_SOCKET;
473 release_agent_blk(agent, false);
475 if (FD_ISSET(agent->iFileDesc, &tReadSet))
476 hb_fd = agent->iFileDesc;
477 if (FD_ISSET(agent->iRpcFd, &tReadSet))
478 rpc_fd = agent->iRpcFd;
480 if (hb_fd == INVALID_TCP_SOCKET &&
481 rpc_fd == INVALID_TCP_SOCKET)
484 /* process heart beat */
485 if (hb_fd != INVALID_TCP_SOCKET) {
486 /* process the message */
487 rc = process_agent_message(agent, hb_fd);
489 PERROR("msg failure: %s",
492 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
493 if (agent->id == master->id) {
494 PERROR("Disconnected from master. Will attempt to reconnect");
495 master_connected = false;
497 release_agent_blk(agent, true);
502 if (rpc_fd != INVALID_TCP_SOCKET) {
503 /* process the message */
504 rc = process_agent_message(agent, rpc_fd);
506 PERROR("msg failure: %s",
509 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
510 if (agent->id == master->id) {
511 PERROR("Disconnected from master. Will attempt to reconnect");
512 master_connected = false;
514 release_agent_blk(agent, true);
520 /* establish connection with the master if I'm an agent
521 * and I have not connected to the master yet.
522 * Otherwise send a heart beat
524 if (!master_connected &&
525 strlen(g_lutf_cfg.master_name) != 0) {
526 PDEBUG("Attempting a connection on master %s",
527 g_lutf_cfg.master_name);
528 master = find_free_agent_blk(&info->hb_info.master_address);
530 PERROR("Failed to allocate agent block");
534 iConnFd = establishTCPConnection(info->hb_info.master_address.sin_addr.s_addr,
535 htons(info->hb_info.master_address.sin_port),
538 PERROR("establishTCPConnection failure: %s. Clearing set",
539 lutf_rc2str(iConnFd));
540 release_agent_blk(master, true);
541 PERROR("Disconnected from master. Will attempt to reconnect");
542 master_connected = false;
546 master->iFileDesc = iConnFd;
547 memcpy(&master->addr,
548 &info->hb_info.master_address,
549 sizeof(master->addr));
550 strncpy(master->name, g_lutf_cfg.master_name,
552 master->name[MAX_STR_LEN-1] = '\0';
553 master->node_type = EN_LUTF_MASTER;
554 gethostname(master->hostname, MAX_STR_LEN);
555 master->telnet_port = info->hb_info.agent_telnet_port;
557 PDEBUG("Connected to master %s on fd %d",
558 master->name, master->iFileDesc);
561 * add the master FD to the select FD set
562 * to be able to process master messages
564 FD_SET(iConnFd, &g_tAllSet);
565 g_iMaxSelectFd = get_highest_fd();
567 master_connected = true;
568 master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
571 if (info->type == EN_LUTF_AGENT) {
572 rc = send_hb(master, info->hb_info.node_name,
573 info->hb_info.agent_telnet_port,
575 if (rc != EN_LUTF_RC_OK) {
576 master_connected = false;
577 g_iMaxSelectFd = get_highest_fd();
582 * Get the time stamp and go through each agent
583 * and see if it's still healthy. For agents which
584 * aren't healthy move off to the dead_list.
585 * This operation is only valid if I'm a master
587 gettimeofday(&time_2, NULL);
588 if (agent_get_hb() && info->type == EN_LUTF_MASTER) {
589 /* check if HB_TO seconds has passed since the last
590 * time we collected the time
592 if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
593 /* do the heartbeat check */
594 agent_hb_check(&time_1, info->type);
598 if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
599 lutf_agent_blk_t *agent = NULL;
603 idx = get_next_active_agent(idx, &agent);
604 /* A master doesn't send a heart
609 if (info->type == EN_LUTF_MASTER &&
610 agent->id == master->id)
613 rc = send_hb(agent, info->hb_info.node_name,
614 info->hb_info.agent_telnet_port,
616 if (rc != EN_LUTF_RC_OK) {
617 if (agent->id == master->id) {
618 PERROR("Disconnected from master. Will attempt to reconnect");
619 master_connected = false;
623 release_agent_blk(agent, dead);
628 /* store the current time */
629 memcpy(&time_1, &time_2, sizeof(time_1));
632 /* Zero out the g_tAllSet */