1 // SPDX-License-Identifier: GPL-2.0
4 * This file is part of Lustre, http://www.lustre.org/
6 * lustre/tests/lutf/lutf_listener.c
8 * Routines for communicating with other LUTF nodes
11 * Author: Amir Shehata <ashehata@whamcloud.com>
20 #include <sys/socket.h>
21 #include <netinet/in.h>
22 #include <netinet/tcp.h>
23 #include <arpa/inet.h>
29 #include "lutf_python.h"
30 #include "lutf_agent.h"
31 #include "lutf_message.h"
32 #include "lutf_listener.h"
34 static fd_set g_tAllSet;
35 static bool g_bShutdown;
36 static int g_iListenFd = INVALID_TCP_SOCKET;
37 static int g_iMaxSelectFd = INVALID_TCP_SOCKET;
39 typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
41 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent);
42 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent);
43 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent);
45 static msg_process_fn_t msg_process_tbl[EN_MSG_TYPE_MAX] = {
46 [EN_MSG_TYPE_HB] = process_msg_hb,
47 [EN_MSG_TYPE_GET_NUM_AGENTS] = process_msg_get_num_agents,
48 [EN_MSG_TYPE_RPC_REQUEST] = process_msg_rpc_request,
51 void lutf_listener_shutdown(void)
56 int get_highest_fd(void)
58 int iAgentFd = lutf_agent_get_highest_fd();
61 if (iAgentFd > g_iListenFd)
65 PDEBUG("Current highest FD = %d", iMaxFd);
70 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent)
74 agent->state |= LUTF_AGENT_WORK_IN_PROGRESS;
75 rc = python_handle_rpc_request(msg);
76 agent->state &= ~LUTF_AGENT_WORK_IN_PROGRESS;
81 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent)
83 lutf_msg_hb_t *hb = (lutf_msg_hb_t *)msg;
84 //PERROR("Procesing HB message");
86 /* endian convert message */
87 hb->telnet_port = ntohl(hb->telnet_port);
88 hb->node_type = ntohl(hb->node_type);
90 /* update the agent with the information */
91 agent->telnet_port = hb->telnet_port;
92 agent->node_type = hb->node_type;
93 strncpy(agent->hostname, hb->node_hostname, MAX_STR_LEN);
94 agent->hostname[MAX_STR_LEN-1] = '\0';
95 strncpy(agent->name, hb->node_name, MAX_STR_LEN);
96 agent->name[MAX_STR_LEN-1] = '\0';
97 gettimeofday(&agent->time_stamp, NULL);
102 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent)
105 lutf_msg_num_agents_query_t query;
107 query.num_agents = get_num_agents();
108 rc = sendTcpMessage(agent->iFileDesc, (char *)&query, sizeof(query));
110 PERROR("failed to send tcp message to get num agents query");
114 return EN_LUTF_RC_OK;
117 static lutf_rc_t process_agent_message(lutf_agent_blk_t *agent, int fd)
119 lutf_rc_t rc = EN_LUTF_RC_OK;
120 lutf_message_hdr_t hdr;
122 msg_process_fn_t proc_fn;
124 /* get the header first */
125 rc = readTcpMessage(fd, (char *)&hdr, sizeof(hdr),
126 TCP_READ_TIMEOUT_SEC);
131 hdr.version = ntohl(hdr.version);
132 if (hdr.version != LUTF_VERSION_NUMBER) {
133 PERROR("version %d != %d", hdr.version,
134 LUTF_VERSION_NUMBER);
135 return EN_LUTF_RC_BAD_VERSION;
138 /* if the ips don't match ignore the message */
139 if (memcmp(&agent->addr.sin_addr, &hdr.ip, sizeof(hdr.ip)))
142 hdr.type = ntohl(hdr.type);
143 hdr.len = ntohl(hdr.len);
145 buffer = calloc(hdr.len, 1);
147 return EN_LUTF_RC_OOM;
149 /* get the rest of the message */
150 rc = readTcpMessage(fd, buffer, hdr.len,
151 TCP_READ_TIMEOUT_SEC);
158 /* call the appropriate processing function */
159 proc_fn = msg_process_tbl[hdr.type];
161 rc = proc_fn(buffer, agent);
167 static lutf_rc_t init_comm(unsigned short server_port)
170 struct sockaddr_in sServAddr;
172 signal(SIGPIPE, SIG_IGN);
174 /* Create a socket to listen to. */
175 g_iListenFd = socket(AF_INET, SOCK_STREAM, 0);
176 if (g_iListenFd < 0) {
177 /* Cannot create a listening socket. */
178 return EN_LUTF_RC_SOCKET_FAIL;
181 /* Set a socket option which will allow us to be quickly restarted
185 if (setsockopt(g_iListenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &iFlags,
186 sizeof(iFlags)) < 0) {
187 /* Cannot change the socket options. */
188 closeTcpConnection(g_iListenFd);
189 return EN_LUTF_RC_FAIL;
192 /* Bind to our listening socket. */
193 bzero((char *) &sServAddr, sizeof(sServAddr));
194 sServAddr.sin_family = AF_INET;
195 sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
196 sServAddr.sin_port = htons(server_port);
198 if (bind(g_iListenFd, (struct sockaddr *) &sServAddr,
199 sizeof(sServAddr)) < 0) {
200 /* Cannot bind our listening socket. */
201 closeTcpConnection(g_iListenFd);
202 return EN_LUTF_RC_BIND_FAILED;
205 /* Let the system know we wish to listen to this port for
208 if (listen(g_iListenFd, 2) < 0) {
209 /* Cannot listen to socket, close and fail */
210 closeTcpConnection(g_iListenFd);
211 return EN_LUTF_RC_LISTEN_FAILED;
214 /* We want this socket to be non-blocking even though it will be used
215 * in a blocking select call. This is to avoid a problem identified by
218 iFlags = fcntl(g_iListenFd, F_GETFL, 0);
219 fcntl(g_iListenFd, F_SETFL, iFlags | O_NONBLOCK);
221 /* Add the listening socket to our select() mask. */
223 FD_SET(g_iListenFd, &g_tAllSet);
225 return EN_LUTF_RC_OK;
228 lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
234 hb.telnet_port = htonl(telnet_port);
235 hb.node_type = htonl(type);
236 strncpy(hb.node_name, name, MAX_STR_LEN);
237 hb.node_name[MAX_STR_LEN-1] = '\0';
238 gethostname(hb.node_hostname, MAX_STR_LEN);
240 /* send the heart beat */
241 rc = lutf_send_msg(agent->iFileDesc, (char *)&hb,
242 sizeof(hb), EN_MSG_TYPE_HB);
243 if (rc != EN_LUTF_RC_OK) {
244 PERROR("Failed to send heart beat %s\n",
251 lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
253 /* we assume the first connection is an HB connection */
254 if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
255 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
256 PERROR("agent in unexpected state. state is %s, but HB FD is %d",
257 agent_state2str(agent), fd);
258 return EN_LUTF_RC_SYS_ERR;
260 PDEBUG("HB Channel Connected: %d", fd);
261 agent->iFileDesc = fd;
262 agent->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
263 return EN_LUTF_RC_OK;
265 } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
266 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
267 PERROR("agent in unexpected state. state is %s, but RPC FD is %d",
268 agent_state2str(agent), fd);
269 return EN_LUTF_RC_SYS_ERR;
271 PDEBUG("RPC Channel Connected: %d", fd);
273 agent->state |= LUTF_AGENT_RPC_CHANNEL_CONNECTED;
274 return EN_LUTF_RC_OK;
278 PERROR("agent is in an unexpected state on connection %s",
279 agent_state2str(agent));
280 return EN_LUTF_RC_SYS_ERR;
283 void close_agent_connection(lutf_agent_blk_t *agent)
285 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
286 FD_CLR(agent->iFileDesc, &g_tAllSet);
287 closeTcpConnection(agent->iFileDesc);
288 agent->iFileDesc = -1;
290 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
291 FD_CLR(agent->iRpcFd, &g_tAllSet);
292 closeTcpConnection(agent->iRpcFd);
295 g_iMaxSelectFd = get_highest_fd();
298 void agent_hb_check(struct timeval *t, lutf_type_t me)
300 lutf_agent_blk_t *agent;
303 for (i = 0; i < MAX_NUM_AGENTS; i++) {
304 agent = find_agent_blk_by_id(i);
306 if (agent && agent->node_type != me) {
307 if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
308 /* agent didn't send a HB move to dead
311 PERROR("agent %s presumed dead", agent->name);
312 release_agent_blk(agent, true);
317 release_agent_blk(agent, false);
323 * main loop. Listens for incoming agent connections, and for agent
324 * messages. Every period of time it triggers a walk through the agent
325 * list to see if any of the HBs stopped
327 * If I am an Agent, then attempt to connect to the master and add an
328 * agent block on the list of agents. After successful connection send
329 * a regular heart beat.
331 * Since the master's agent block is on the list of agents and its FD is
332 * on the select FD set, then if the master sends the agent a message
333 * the agent should be able to process it.
335 void *lutf_listener_main(void *usr_data)
338 struct sockaddr_in sCliAddr;
344 lutf_agent_blk_t *agent = NULL, *master = NULL;
345 struct timeval time_1, time_2, select_to;
346 lutf_listener_info_t *info;
347 bool master_connected = false;
349 info = (lutf_listener_info_t *)usr_data;
351 ((info) && (info->listen_port == 0))) {
352 PERROR("No liston port provided");
356 rc = init_comm(info->listen_port);
358 PERROR("init_comm failed: %s", lutf_rc2str(rc));
364 g_iMaxSelectFd = g_iListenFd;
366 gettimeofday(&time_2, NULL);
368 /* Main Processing Loop: Keep going until we have reason
371 while (!g_bShutdown) {
372 /* Wait on our select mask for an event to occur. */
373 select_to.tv_sec = HB_TO;
374 select_to.tv_usec = 0;
377 tReadSet = g_tAllSet;
378 iNReady = select(g_iMaxSelectFd + 1, &tReadSet, NULL, NULL,
381 release_dead_list_agents();
383 /* Determine if we failed the select call */
385 /* Check to see if we were interrupted by a signal. */
386 if ((errno == EINTR) || (errno == EAGAIN)) {
387 PERROR("Select failure: errno = %d", errno);
388 } else if (errno != ECONNABORTED) {
389 /* If this is an ECONNABORTED error, just
390 * ignore it. Raise a fatal alarm and shut
393 PERROR("Shutting down Listener thread. errno: %d",
395 lutf_listener_shutdown();
398 /* store the current time */
401 /* Zero out the g_tAllSet */
407 if (FD_ISSET(g_iListenFd, &tReadSet)) {
408 /* A new client is trying to connect. */
409 tCliLen = sizeof(sCliAddr);
410 iConnFd = accept(g_iListenFd,
411 (struct sockaddr *) &sCliAddr,
414 /* Cannot accept new connection...
417 if (errno != EWOULDBLOCK)
418 PERROR("Error on accept(), errno = %d",
421 /* Try to see if we have an agent
422 * with the same address, since
423 * agents can have multiple tcp
426 agent = find_create_agent_blk_by_addr(&sCliAddr);
428 /* Cannot support more clients...just ignore. */
429 PERROR("Cannot accept more clients");
430 closeTcpConnection(iConnFd);
434 rc = complete_agent_connection(agent,
436 if (rc != EN_LUTF_RC_OK) {
437 release_agent_blk(agent, true);
441 /* all nodes listen on the
444 agent->listen_port = info->listen_port;
446 /* Add new client to our select mask. */
447 FD_SET(iConnFd, &g_tAllSet);
448 g_iMaxSelectFd = get_highest_fd();
450 /* Ok, it seems that the connected socket gains
451 * the same flags as the listen socket. We want
452 * to make it blocking here.
454 iFlags = fcntl(iConnFd, F_GETFL, 0);
455 fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
457 /* And, we want to turn off Nagle's algorithm to
461 setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
465 PDEBUG("Received a connection from %s on FD %d\n",
466 inet_ntoa(agent->addr.sin_addr), iConnFd);
470 /* See if there are other messages waiting. */
474 /* need to iterate through the clients and see if a
475 * message was sent to any of them
477 for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
478 /* reset the return code to avoid misbehaving on previous
483 if ((agent = find_agent_blk_by_id(i))) {
484 int hb_fd = INVALID_TCP_SOCKET;
485 int rpc_fd = INVALID_TCP_SOCKET;
487 release_agent_blk(agent, false);
489 if (FD_ISSET(agent->iFileDesc, &tReadSet))
490 hb_fd = agent->iFileDesc;
491 if (FD_ISSET(agent->iRpcFd, &tReadSet))
492 rpc_fd = agent->iRpcFd;
494 if (hb_fd == INVALID_TCP_SOCKET &&
495 rpc_fd == INVALID_TCP_SOCKET)
498 /* process heart beat */
499 if (hb_fd != INVALID_TCP_SOCKET) {
500 /* process the message */
501 rc = process_agent_message(agent, hb_fd);
503 PERROR("msg failure: %s",
506 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
507 if (agent->id == master->id) {
508 PERROR("Disconnected from master. Will attempt to reconnect");
509 master_connected = false;
511 release_agent_blk(agent, true);
516 if (rpc_fd != INVALID_TCP_SOCKET) {
517 /* process the message */
518 rc = process_agent_message(agent, rpc_fd);
520 PERROR("msg failure: %s",
523 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
524 if (agent->id == master->id) {
525 PERROR("Disconnected from master. Will attempt to reconnect");
526 master_connected = false;
528 release_agent_blk(agent, true);
534 /* establish connection with the master if I'm an agent
535 * and I have not connected to the master yet.
536 * Otherwise send a heart beat
538 if (!master_connected &&
539 strlen(g_lutf_cfg.master_name) != 0) {
540 PDEBUG("Attempting a connection on master %s",
541 g_lutf_cfg.master_name);
542 master = find_free_agent_blk(&info->hb_info.master_address);
544 PERROR("Failed to allocate agent block");
548 iConnFd = establishTCPConnection(info->hb_info.master_address.sin_addr.s_addr,
549 htons(info->hb_info.master_address.sin_port),
552 PERROR("establishTCPConnection failure: %s. Clearing set",
553 lutf_rc2str(iConnFd));
554 release_agent_blk(master, true);
555 PERROR("Disconnected from master. Will attempt to reconnect");
556 master_connected = false;
560 master->iFileDesc = iConnFd;
561 memcpy(&master->addr,
562 &info->hb_info.master_address,
563 sizeof(master->addr));
564 if (g_lutf_cfg.master_name) {
565 strncpy(master->name, g_lutf_cfg.master_name,
567 master->name[MAX_STR_LEN-1] = '\0';
569 master->node_type = EN_LUTF_MASTER;
570 gethostname(master->hostname, MAX_STR_LEN);
571 master->telnet_port = info->hb_info.agent_telnet_port;
573 PDEBUG("Connected to master %s on fd %d",
574 master->name, master->iFileDesc);
577 * add the master FD to the select FD set
578 * to be able to process master messages
580 FD_SET(iConnFd, &g_tAllSet);
581 g_iMaxSelectFd = get_highest_fd();
583 master_connected = true;
584 master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
587 if (info->type == EN_LUTF_AGENT) {
588 rc = send_hb(master, info->hb_info.node_name,
589 info->hb_info.agent_telnet_port,
591 if (rc != EN_LUTF_RC_OK) {
592 master_connected = false;
593 g_iMaxSelectFd = get_highest_fd();
598 * Get the time stamp and go through each agent
599 * and see if it's still healthy. For agents which
600 * aren't healthy move off to the dead_list.
601 * This operation is only valid if I'm a master
603 gettimeofday(&time_2, NULL);
604 if (agent_get_hb() && info->type == EN_LUTF_MASTER) {
605 /* check if HB_TO seconds has passed since the last
606 * time we collected the time
608 if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
609 /* do the heartbeat check */
610 agent_hb_check(&time_1, info->type);
614 if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
615 lutf_agent_blk_t *agent = NULL;
619 idx = get_next_active_agent(idx, &agent);
620 /* A master doesn't send a heart
625 if (info->type == EN_LUTF_MASTER &&
626 agent->id == master->id)
629 rc = send_hb(agent, info->hb_info.node_name,
630 info->hb_info.agent_telnet_port,
632 if (rc != EN_LUTF_RC_OK) {
633 if (agent->id == master->id) {
634 PERROR("Disconnected from master. Will attempt to reconnect");
635 master_connected = false;
639 release_agent_blk(agent, dead);
644 /* store the current time */
645 memcpy(&time_1, &time_2, sizeof(time_1));
648 /* Zero out the g_tAllSet */