6 #include <sys/socket.h>
7 #include <netinet/in.h>
8 #include <netinet/tcp.h>
15 #include "lutf_python.h"
16 #include "lutf_agent.h"
17 #include "lutf_message.h"
18 #include "lutf_listener.h"
20 static fd_set g_tAllSet;
21 static bool g_bShutdown = false;
22 static int g_iListenFd = INVALID_TCP_SOCKET;
23 bool g_agent_enable_hb = true;
25 typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
27 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent);
28 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent);
29 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent);
31 static msg_process_fn_t msg_process_tbl[EN_MSG_TYPE_MAX] = {
32 [EN_MSG_TYPE_HB] = process_msg_hb,
33 [EN_MSG_TYPE_GET_NUM_AGENTS] = process_msg_get_num_agents,
34 [EN_MSG_TYPE_RPC_REQUEST] = process_msg_rpc_request,
37 void lutf_listener_shutdown(void)
42 int get_highest_fd(void)
44 int iAgentFd = lutf_agent_get_highest_fd();
47 if (iAgentFd > g_iListenFd)
51 PDEBUG("Current highest FD = %d", iMaxFd);
56 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent)
60 agent->state |= LUTF_AGENT_WORK_IN_PROGRESS;
61 rc = python_handle_rpc_request(msg);
62 agent->state &= ~LUTF_AGENT_WORK_IN_PROGRESS;
67 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent)
69 lutf_msg_hb_t *hb = (lutf_msg_hb_t *)msg;
70 //PERROR("Procesing HB message");
72 /* endian convert message */
73 hb->telnet_port = ntohl(hb->telnet_port);
74 hb->node_type = ntohl(hb->node_type);
76 /* update the agent with the information */
77 agent->telnet_port = hb->telnet_port;
78 agent->node_type = hb->node_type;
79 strncpy(agent->hostname, hb->node_hostname, MAX_STR_LEN);
80 agent->hostname[MAX_STR_LEN-1] = '\0';
81 strncpy(agent->name, hb->node_name, MAX_STR_LEN);
82 agent->name[MAX_STR_LEN-1] = '\0';
83 gettimeofday(&agent->time_stamp, NULL);
88 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent)
91 lutf_msg_num_agents_query_t query;
93 query.num_agents = get_num_agents();
94 rc = sendTcpMessage(agent->iFileDesc, (char *)&query, sizeof(query));
96 PERROR("failed to send tcp message to get num agents query");
100 return EN_LUTF_RC_OK;
103 static lutf_rc_t process_agent_message(lutf_agent_blk_t *agent, int fd)
105 lutf_rc_t rc = EN_LUTF_RC_OK;
106 lutf_message_hdr_t hdr;
108 msg_process_fn_t proc_fn;
110 /* get the header first */
111 rc = readTcpMessage(fd, (char *)&hdr, sizeof(hdr),
112 TCP_READ_TIMEOUT_SEC);
117 hdr.version = ntohl(hdr.version);
118 if (hdr.version != LUTF_VERSION_NUMBER) {
119 PERROR("version %d != %d", hdr.version,
120 LUTF_VERSION_NUMBER);
121 return EN_LUTF_RC_BAD_VERSION;
124 /* if the ips don't match ignore the message */
125 if (memcmp(&agent->addr.sin_addr, &hdr.ip, sizeof(hdr.ip)))
128 hdr.type = ntohl(hdr.type);
129 hdr.len = ntohl(hdr.len);
131 buffer = calloc(hdr.len, 1);
133 return EN_LUTF_RC_OOM;
135 /* get the rest of the message */
136 rc = readTcpMessage(fd, buffer, hdr.len,
137 TCP_READ_TIMEOUT_SEC);
144 /* call the appropriate processing function */
145 proc_fn = msg_process_tbl[hdr.type];
147 rc = proc_fn(buffer, agent);
153 static lutf_rc_t init_comm(unsigned short server_port)
156 struct sockaddr_in sServAddr;
158 signal(SIGPIPE, SIG_IGN);
160 /* Create a socket to listen to. */
161 g_iListenFd = socket(AF_INET, SOCK_STREAM, 0);
162 if (g_iListenFd < 0) {
163 /* Cannot create a listening socket. */
164 return EN_LUTF_RC_SOCKET_FAIL;
167 /* Set a socket option which will allow us to be quickly restarted
171 if (setsockopt(g_iListenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &iFlags,
172 sizeof(iFlags)) < 0) {
173 /* Cannot change the socket options. */
174 closeTcpConnection(g_iListenFd);
175 return EN_LUTF_RC_FAIL;
178 /* Bind to our listening socket. */
179 bzero((char *) &sServAddr, sizeof(sServAddr));
180 sServAddr.sin_family = AF_INET;
181 sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
182 sServAddr.sin_port = htons(server_port);
184 if (bind(g_iListenFd, (struct sockaddr *) &sServAddr, sizeof(sServAddr)) < 0) {
185 /* Cannot bind our listening socket. */
186 closeTcpConnection(g_iListenFd);
187 return EN_LUTF_RC_BIND_FAILED;
190 /* Let the system know we wish to listen to this port for
192 if (listen(g_iListenFd, 2) < 0) {
193 /* Cannot listen to socket, close and fail */
194 closeTcpConnection(g_iListenFd);
195 return EN_LUTF_RC_LISTEN_FAILED;
198 /* We want this socket to be non-blocking even though it will be used
199 * in a blocking select call. This is to avoid a problem identified by
202 iFlags = fcntl(g_iListenFd, F_GETFL, 0);
203 fcntl(g_iListenFd, F_SETFL, iFlags | O_NONBLOCK);
205 /* Add the listening socket to our select() mask. */
207 FD_SET(g_iListenFd, &g_tAllSet);
209 return EN_LUTF_RC_OK;
212 static inline int close_agent_connection(lutf_agent_blk_t *agent)
214 FD_CLR(agent->iFileDesc, &g_tAllSet);
215 FD_CLR(agent->iRpcFd, &g_tAllSet);
216 closeTcpConnection(agent->iRpcFd);
217 closeTcpConnection(agent->iFileDesc);
219 ~LUTF_AGENT_RPC_CHANNEL_CONNECTED;
221 ~LUTF_AGENT_HB_CHANNEL_CONNECTED;
222 return get_highest_fd();
225 lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
231 hb.telnet_port = htonl(telnet_port);
232 hb.node_type = htonl(type);
233 strncpy(hb.node_name, name, MAX_STR_LEN);
234 hb.node_name[MAX_STR_LEN-1] = '\0';
235 gethostname(hb.node_hostname, MAX_STR_LEN);
237 /* send the heart beat */
238 rc = lutf_send_msg(agent->iFileDesc, (char *)&hb,
239 sizeof(hb), EN_MSG_TYPE_HB);
240 if (rc != EN_LUTF_RC_OK) {
241 PERROR("Failed to send heart beat %s\n",
248 lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
250 /* we assume the first connection is an HB connection */
251 if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
252 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
253 PERROR("agent in unexpected state. "
254 "state is %s, but HB FD is %d",
255 agent_state2str(agent), fd);
256 return EN_LUTF_RC_SYS_ERR;
258 PDEBUG("HB Channel Connected: %d", fd);
259 agent->iFileDesc = fd;
260 agent->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
261 return EN_LUTF_RC_OK;
263 } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
264 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
265 PERROR("agent in unexpected state. "
266 "state is %s, but RPC FD is %d",
267 agent_state2str(agent), fd);
268 return EN_LUTF_RC_SYS_ERR;
270 PDEBUG("RPC Channel Connected: %d", fd);
272 agent->state |= LUTF_AGENT_RPC_CHANNEL_CONNECTED;
273 return EN_LUTF_RC_OK;
277 PERROR("agent is in an unexpected state on connection %s",
278 agent_state2str(agent));
279 return EN_LUTF_RC_SYS_ERR;
284 * main loop. Listens for incoming agent connections, and for agent
285 * messages. Every period of time it triggers a walk through the agent
286 * list to see if any of the HBs stopped
288 * If I am an Agent, then attempt to connect to the master and add an
289 * agent block on the list of agents. After successful connection send
290 * a regular heart beat.
292 * Since the master's agent block is on the list of agents and its FD is
293 * on the select FD set, then if the master sends the agent a message
294 * the agent should be able to process it.
296 void *lutf_listener_main(void *usr_data)
299 struct sockaddr_in sCliAddr;
306 lutf_agent_blk_t *agent = NULL, *master = NULL;
307 struct timeval time_1, time_2, select_to;
308 lutf_listener_info_t *info;
309 bool master_connected = false;
311 info = (lutf_listener_info_t *)usr_data;
313 ((info) && (info->listen_port == 0))) {
314 PERROR("No liston port provided");
318 rc = init_comm(info->listen_port);
320 PERROR("init_comm failed: %s", lutf_rc2str(rc));
326 iMaxSelectFd = g_iListenFd;
328 gettimeofday(&time_1, NULL);
331 /* Main Processing Loop: Keep going until we have reason to shutdown. */
332 while (!g_bShutdown) {
333 /* Wait on our select mask for an event to occur. */
334 tReadSet = g_tAllSet;
336 select_to.tv_sec = HB_TO;
337 select_to.tv_usec = 0;
339 iNReady = select(iMaxSelectFd + 1, &tReadSet, NULL, NULL, &select_to);
341 /* Determine if we failed the select call */
343 /* Check to see if we were interrupted by a signal. */
344 if ((errno == EINTR) || (errno == EAGAIN)) {
345 PERROR("Select failure: errno = %d", errno);
347 /* If this is an ECONNABORTED error, just ignore it. */
348 if (errno != ECONNABORTED) {
349 /* Raise a fatal alarm. */
351 PERROR("Shutting down Listener thread. errno: %d", errno);
356 if (FD_ISSET(g_iListenFd, &tReadSet)) {
357 /* A new client is trying to connect. */
358 tCliLen = sizeof(sCliAddr);
359 iConnFd = accept(g_iListenFd, (struct sockaddr *) &sCliAddr,
362 /* Cannot accept new connection...just ignore. */
363 if (errno != EWOULDBLOCK)
364 PERROR("Error on accept(), errno = %d", errno);
366 /* Try to see if we have an agent
367 * with the same address, since
368 * agents can have multiple tcp
371 agent = find_create_agent_blk_by_addr(&sCliAddr);
373 /* Cannot support more clients...just ignore. */
374 PERROR("Cannot accept more clients");
375 closeTcpConnection(iConnFd);
379 rc = complete_agent_connection(agent,
381 if (rc != EN_LUTF_RC_OK) {
382 int agent_id = agent->id;
383 iMaxSelectFd = close_agent_connection(agent);
384 release_agent_blk(agent);
385 free_agent_blk(agent_id);
389 /* all nodes listen on the
392 agent->listen_port = info->listen_port;
394 /* Add new client to our select mask. */
395 FD_SET(iConnFd, &g_tAllSet);
396 iMaxSelectFd = get_highest_fd();
398 /* Ok, it seems that the connected socket gains
399 * the same flags as the listen socket. We want
400 * to make it blocking here.
402 iFlags = fcntl(iConnFd, F_GETFL, 0);
403 fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
405 /* And, we want to turn off Nagle's algorithm to
409 setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
413 PDEBUG("Received a connection from %s on FD %d\n",
414 inet_ntoa(agent->addr.sin_addr), iConnFd);
416 release_agent_blk(agent);
420 /* See if there are other messages waiting. */
424 /* need to iterate through the clients and see if a
425 * message was sent to any of them
427 for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
428 /* reset the return code to avoid misbehaving on previous
433 if ((agent = find_agent_blk_by_id(i))) {
434 int hb_fd = INVALID_TCP_SOCKET;
435 int rpc_fd = INVALID_TCP_SOCKET;
437 if (FD_ISSET(agent->iFileDesc, &tReadSet))
438 hb_fd = agent->iFileDesc;
439 if (FD_ISSET(agent->iRpcFd, &tReadSet))
440 rpc_fd = agent->iRpcFd;
442 if (hb_fd == INVALID_TCP_SOCKET &&
443 rpc_fd == INVALID_TCP_SOCKET)
446 /* process heart beat */
447 if (hb_fd != INVALID_TCP_SOCKET) {
448 /* process the message */
449 rc = process_agent_message(agent, hb_fd);
451 PERROR("msg failure: %s",
454 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
455 int agent_id = agent->id;
456 if (agent->id == master->id) {
457 PERROR("Disconnected from master. Will attempt to reconnect");
458 master_connected = false;
460 iMaxSelectFd = close_agent_connection(agent);
461 release_agent_blk(agent);
462 free_agent_blk(agent_id);
467 if (rpc_fd != INVALID_TCP_SOCKET) {
468 /* process the message */
469 rc = process_agent_message(agent, rpc_fd);
471 PERROR("msg failure: %s",
474 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
475 int agent_id = agent->id;
476 if (agent->id == master->id) {
477 PERROR("Disconnected from master. Will attempt to reconnect");
478 master_connected = false;
480 iMaxSelectFd = close_agent_connection(agent);
481 release_agent_blk(agent);
482 free_agent_blk(agent_id);
485 release_agent_blk(agent);
489 /* establish connection with the master if I'm an agent
490 * and I have not connected to the master yet.
491 * Otherwise send a heart beat
493 if (!master_connected &&
494 strlen(g_lutf_cfg.master_name) != 0) {
495 PDEBUG("Attempting a connection on master %s",
496 g_lutf_cfg.master_name);
497 master = find_free_agent_blk(&info->hb_info.master_address);
499 PERROR("Failed to allocate agent block");
503 iConnFd = establishTCPConnection(
504 info->hb_info.master_address.sin_addr.s_addr,
505 htons(info->hb_info.master_address.sin_port),
509 int master_id = master->id;
511 PERROR("establishTCPConnection failure: %s. Clearing set",
512 lutf_rc2str(iConnFd));
513 iMaxSelectFd = close_agent_connection(master);
514 release_agent_blk(master);
515 free_agent_blk(master_id);
516 PERROR("Disconnected from master. Will attempt to reconnect");
517 master_connected = false;
521 master->iFileDesc = iConnFd;
522 memcpy(&master->addr,
523 &info->hb_info.master_address,
524 sizeof(master->addr));
525 strncpy(master->name, g_lutf_cfg.master_name,
527 master->name[MAX_STR_LEN-1] = '\0';
528 master->node_type = EN_LUTF_MASTER;
529 gethostname(master->hostname, MAX_STR_LEN);
530 master->telnet_port = info->hb_info.agent_telnet_port;
531 release_agent_blk(master);
533 PDEBUG("Connected to master %s on fd %d",
534 master->name, master->iFileDesc);
537 * add the master FD to the select FD set
538 * to be able to process master messages
540 FD_SET(iConnFd, &g_tAllSet);
541 iMaxSelectFd = get_highest_fd();
543 master_connected = true;
544 master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
547 if (info->type == EN_LUTF_AGENT) {
548 rc = send_hb(master, info->hb_info.node_name,
549 info->hb_info.agent_telnet_port,
551 if (rc != EN_LUTF_RC_OK) {
552 master_connected = false;
553 iMaxSelectFd = get_highest_fd();
558 * Get the time stamp and go through each agent
559 * and see if it's still healthy. For agents which
560 * aren't healthy move off to the dead_list.
561 * This operation is only valid if I'm a master
563 gettimeofday(&time_2, NULL);
564 if (g_agent_enable_hb && info->type == EN_LUTF_MASTER) {
565 /* check if HB_TO seconds has passed since the last
566 * time we collected the time */
567 if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
569 /* do the heartbeat check */
570 agent_hb_check(&time_1, info->type);
574 if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
575 lutf_agent_blk_t *agent = NULL;
579 idx = get_next_active_agent(idx, &agent);
580 /* A master doesn't send a heart
583 if (info->type == EN_LUTF_MASTER &&
584 agent->id == master->id)
586 int agent_id = agent->id;
587 rc = send_hb(agent, info->hb_info.node_name,
588 info->hb_info.agent_telnet_port,
590 if (rc != EN_LUTF_RC_OK) {
591 if (agent->id == master->id) {
592 PERROR("Disconnected from master. Will attempt to reconnect");
593 master_connected = false;
595 iMaxSelectFd = close_agent_connection(agent);
596 release_agent_blk(agent);
597 free_agent_blk(agent_id);
599 release_agent_blk(agent);
605 /* store the current time */
609 /* Zero out the g_tAllSet */