Whamcloud - gitweb
1dabe92c447120a0b6b9ec2b4d989ce18ca74bb1
[fs/lustre-release.git] / lustre / tests / lutf / src / lutf_listener.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * This file is part of Lustre, http://www.lustre.org/
5  *
6  * lustre/tests/lutf/lutf_listener.c
7  *
8  * Routines for communicating with other LUTF nodes
9  * over TCP.
10  *
11  * Author: Amir Shehata <ashehata@whamcloud.com>
12  *
13  */
14
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <sys/socket.h>
21 #include <netinet/in.h>
22 #include <netinet/tcp.h>
23 #include <arpa/inet.h>
24 #include <sys/time.h>
25 #include <pthread.h>
26 #include <string.h>
27 #include <signal.h>
28 #include "lutf.h"
29 #include "lutf_python.h"
30 #include "lutf_agent.h"
31 #include "lutf_message.h"
32 #include "lutf_listener.h"
33
34 static fd_set g_tAllSet;
35 static bool g_bShutdown;
36 static int g_iListenFd = INVALID_TCP_SOCKET;
37 static int g_iMaxSelectFd = INVALID_TCP_SOCKET;
38
39 typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
40
41 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent);
42 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent);
43 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent);
44
45 static msg_process_fn_t msg_process_tbl[EN_MSG_TYPE_MAX] = {
46         [EN_MSG_TYPE_HB] = process_msg_hb,
47         [EN_MSG_TYPE_GET_NUM_AGENTS] = process_msg_get_num_agents,
48         [EN_MSG_TYPE_RPC_REQUEST] = process_msg_rpc_request,
49 };
50
51 void lutf_listener_shutdown(void)
52 {
53         g_bShutdown = true;
54 }
55
56 int get_highest_fd(void)
57 {
58         int iAgentFd = lutf_agent_get_highest_fd();
59         int iMaxFd;
60
61         if (iAgentFd > g_iListenFd)
62                 iMaxFd = iAgentFd;
63         else
64                 iMaxFd = g_iListenFd;
65         PDEBUG("Current highest FD = %d", iMaxFd);
66
67         return iMaxFd;
68 }
69
70 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent)
71 {
72         lutf_rc_t rc;
73
74         agent->state |= LUTF_AGENT_WORK_IN_PROGRESS;
75         rc = python_handle_rpc_request(msg);
76         agent->state &= ~LUTF_AGENT_WORK_IN_PROGRESS;
77
78         return rc;
79 }
80
81 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent)
82 {
83         lutf_msg_hb_t *hb = (lutf_msg_hb_t *)msg;
84         //PERROR("Procesing HB message");
85
86         /* endian convert message */
87         hb->telnet_port = ntohl(hb->telnet_port);
88         hb->node_type = ntohl(hb->node_type);
89
90         /* update the agent with the information */
91         agent->telnet_port = hb->telnet_port;
92         agent->node_type = hb->node_type;
93         strncpy(agent->hostname, hb->node_hostname, MAX_STR_LEN);
94         agent->hostname[MAX_STR_LEN-1] = '\0';
95         strncpy(agent->name, hb->node_name, MAX_STR_LEN);
96         agent->name[MAX_STR_LEN-1] = '\0';
97         gettimeofday(&agent->time_stamp, NULL);
98
99         return EN_LUTF_RC_OK;
100 }
101
102 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent)
103 {
104         lutf_rc_t rc;
105         lutf_msg_num_agents_query_t query;
106
107         query.num_agents = get_num_agents();
108         rc = sendTcpMessage(agent->iFileDesc, (char *)&query, sizeof(query));
109         if (rc) {
110                 PERROR("failed to send tcp message to get num agents query");
111                 return rc;
112         }
113
114         return EN_LUTF_RC_OK;
115 }
116
117 static lutf_rc_t process_agent_message(lutf_agent_blk_t *agent, int fd)
118 {
119         lutf_rc_t rc = EN_LUTF_RC_OK;
120         lutf_message_hdr_t hdr;
121         char *buffer;
122         msg_process_fn_t proc_fn;
123
124         /* get the header first */
125         rc = readTcpMessage(fd, (char *)&hdr, sizeof(hdr),
126                             TCP_READ_TIMEOUT_SEC);
127
128         if (rc)
129                 return rc;
130
131         hdr.version = ntohl(hdr.version);
132         if (hdr.version != LUTF_VERSION_NUMBER) {
133                 PERROR("version %d != %d", hdr.version,
134                        LUTF_VERSION_NUMBER);
135                 return EN_LUTF_RC_BAD_VERSION;
136         }
137
138         /* if the ips don't match ignore the message */
139         if (memcmp(&agent->addr.sin_addr, &hdr.ip, sizeof(hdr.ip)))
140                 return rc;
141
142         hdr.type = ntohl(hdr.type);
143         hdr.len = ntohl(hdr.len);
144
145         buffer = calloc(hdr.len, 1);
146         if (!buffer)
147                 return EN_LUTF_RC_OOM;
148
149         /* get the rest of the message */
150         rc = readTcpMessage(fd, buffer, hdr.len,
151                             TCP_READ_TIMEOUT_SEC);
152
153         if (rc) {
154                 free(buffer);
155                 return rc;
156         }
157
158         /* call the appropriate processing function */
159         proc_fn = msg_process_tbl[hdr.type];
160         if (proc_fn)
161                 rc = proc_fn(buffer, agent);
162
163         free(buffer);
164         return rc;
165 }
166
167 static lutf_rc_t init_comm(unsigned short server_port)
168 {
169         int iFlags;
170         struct sockaddr_in sServAddr;
171
172         signal(SIGPIPE, SIG_IGN);
173
174         /*  Create a socket to listen to.  */
175         g_iListenFd = socket(AF_INET, SOCK_STREAM, 0);
176         if (g_iListenFd < 0) {
177                 /*  Cannot create a listening socket.  */
178                 return EN_LUTF_RC_SOCKET_FAIL;
179         }
180
181         /* Set a socket option which will allow us to be quickly restarted
182          * if necessary.
183          */
184         iFlags = 1;
185         if (setsockopt(g_iListenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &iFlags,
186                        sizeof(iFlags)) < 0) {
187                 /*  Cannot change the socket options.  */
188                 closeTcpConnection(g_iListenFd);
189                 return EN_LUTF_RC_FAIL;
190         }
191
192         /*  Bind to our listening socket.  */
193         bzero((char *) &sServAddr, sizeof(sServAddr));
194         sServAddr.sin_family = AF_INET;
195         sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
196         sServAddr.sin_port = htons(server_port);
197
198         if (bind(g_iListenFd, (struct sockaddr *) &sServAddr,
199                  sizeof(sServAddr)) < 0) {
200                 /*  Cannot bind our listening socket.  */
201                 closeTcpConnection(g_iListenFd);
202                 return EN_LUTF_RC_BIND_FAILED;
203         }
204
205         /* Let the system know we wish to listen to this port for
206          * connections.
207          */
208         if (listen(g_iListenFd, 2) < 0) {
209                 /*  Cannot listen to socket, close and fail  */
210                 closeTcpConnection(g_iListenFd);
211                 return EN_LUTF_RC_LISTEN_FAILED;
212         }
213
214         /* We want this socket to be non-blocking even though it will be used
215          * in a blocking select call. This is to avoid a problem identified by
216          * Richard Stevens.
217          */
218         iFlags = fcntl(g_iListenFd, F_GETFL, 0);
219         fcntl(g_iListenFd, F_SETFL, iFlags | O_NONBLOCK);
220
221         /*  Add the listening socket to our select() mask.  */
222         FD_ZERO(&g_tAllSet);
223         FD_SET(g_iListenFd, &g_tAllSet);
224
225         return EN_LUTF_RC_OK;
226 }
227
228 lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
229                   int type)
230 {
231         lutf_msg_hb_t hb;
232         int rc;
233
234         hb.telnet_port = htonl(telnet_port);
235         hb.node_type = htonl(type);
236         strncpy(hb.node_name, name, MAX_STR_LEN);
237         hb.node_name[MAX_STR_LEN-1] = '\0';
238         gethostname(hb.node_hostname, MAX_STR_LEN);
239
240         /* send the heart beat */
241         rc = lutf_send_msg(agent->iFileDesc, (char *)&hb,
242                            sizeof(hb), EN_MSG_TYPE_HB);
243         if (rc != EN_LUTF_RC_OK) {
244                 PERROR("Failed to send heart beat %s\n",
245                         lutf_rc2str(rc));
246         }
247
248         return rc;
249 }
250
251 lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
252 {
253         /* we assume the first connection is an HB connection */
254         if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
255                 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
256                         PERROR("agent in unexpected state. state is %s, but HB FD is %d",
257                                agent_state2str(agent), fd);
258                         return EN_LUTF_RC_SYS_ERR;
259                 } else {
260                         PDEBUG("HB Channel Connected: %d", fd);
261                         agent->iFileDesc = fd;
262                         agent->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
263                         return EN_LUTF_RC_OK;
264                 }
265         } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
266                 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
267                         PERROR("agent in unexpected state. state is %s, but RPC FD is %d",
268                                agent_state2str(agent), fd);
269                         return EN_LUTF_RC_SYS_ERR;
270                 } else {
271                         PDEBUG("RPC Channel Connected: %d", fd);
272                         agent->iRpcFd = fd;
273                         agent->state |= LUTF_AGENT_RPC_CHANNEL_CONNECTED;
274                         return EN_LUTF_RC_OK;
275                 }
276         }
277
278         PERROR("agent is in an unexpected state on connection %s",
279                agent_state2str(agent));
280         return EN_LUTF_RC_SYS_ERR;
281 }
282
283 void close_agent_connection(lutf_agent_blk_t *agent)
284 {
285         if (agent->iFileDesc != INVALID_TCP_SOCKET) {
286                 FD_CLR(agent->iFileDesc, &g_tAllSet);
287                 closeTcpConnection(agent->iFileDesc);
288                 agent->iFileDesc = -1;
289         }
290         if (agent->iRpcFd != INVALID_TCP_SOCKET) {
291                 FD_CLR(agent->iRpcFd, &g_tAllSet);
292                 closeTcpConnection(agent->iRpcFd);
293                 agent->iRpcFd = -1;
294         }
295         g_iMaxSelectFd = get_highest_fd();
296 }
297
298 void agent_hb_check(struct timeval *t, lutf_type_t me)
299 {
300         lutf_agent_blk_t *agent;
301         int i = 0;
302
303         for (i = 0; i < MAX_NUM_AGENTS; i++) {
304                 agent = find_agent_blk_by_id(i);
305
306                 if (agent && agent->node_type != me) {
307                         if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
308                                 /* agent didn't send a HB move to dead
309                                  * list
310                                  */
311                                 PERROR("agent %s presumed dead", agent->name);
312                                 release_agent_blk(agent, true);
313                                 continue;
314                         }
315                 }
316                 if (agent)
317                         release_agent_blk(agent, false);
318         }
319 }
320
321 /*
322  * lutf_listener_main
323  *   main loop.  Listens for incoming agent connections, and for agent
324  *   messages.  Every period of time it triggers a walk through the agent
325  *   list to see if any of the HBs stopped
326  *
327  *   If I am an Agent, then attempt to connect to the master and add an
328  *   agent block on the list of agents. After successful connection send
329  *   a regular heart beat.
330  *
331  *   Since the master's agent block is on the list of agents and its FD is
332  *   on the select FD set, then if the master sends the agent a message
333  *   the agent should be able to process it.
334  */
335 void *lutf_listener_main(void *usr_data)
336 {
337         int iConnFd;
338         struct sockaddr_in sCliAddr;
339         socklen_t  tCliLen;
340         fd_set tReadSet;
341         int iNReady;
342         int i;
343         lutf_rc_t rc;
344         lutf_agent_blk_t *agent = NULL, *master = NULL;
345         struct timeval time_1, time_2, select_to;
346         lutf_listener_info_t *info;
347         bool master_connected = false;
348
349         info = (lutf_listener_info_t *)usr_data;
350         if ((!info) ||
351             ((info) && (info->listen_port == 0))) {
352                 PERROR("No liston port provided");
353                 return NULL;
354         }
355
356         rc = init_comm(info->listen_port);
357         if (rc) {
358                 PERROR("init_comm failed: %s", lutf_rc2str(rc));
359                 return NULL;
360         }
361
362         agent_init();
363
364         g_iMaxSelectFd = g_iListenFd;
365
366         gettimeofday(&time_2, NULL);
367
368         /*  Main Processing Loop: Keep going until we have reason
369          * to shutdown.
370          */
371         while (!g_bShutdown) {
372                 /*  Wait on our select mask for an event to occur.  */
373                 select_to.tv_sec = HB_TO;
374                 select_to.tv_usec = 0;
375
376                 FD_ZERO(&tReadSet);
377                 tReadSet = g_tAllSet;
378                 iNReady = select(g_iMaxSelectFd + 1, &tReadSet, NULL, NULL,
379                                  &select_to);
380
381                 release_dead_list_agents();
382
383                 /*  Determine if we failed the select call */
384                 if (iNReady < 0) {
385                         /*  Check to see if we were interrupted by a signal.  */
386                         if ((errno == EINTR) || (errno == EAGAIN)) {
387                                 PERROR("Select failure: errno = %d", errno);
388                         } else if (errno != ECONNABORTED) {
389                                 /* If this is an ECONNABORTED error, just
390                                  * ignore it. Raise a fatal alarm and shut
391                                  * down.
392                                  */
393                                 PERROR("Shutting down Listener thread. errno: %d",
394                                        errno);
395                                 lutf_listener_shutdown();
396                         }
397
398                         /* store the current time */
399                         time_1 = time_2;
400
401                         /* Zero out the g_tAllSet */
402                         FD_ZERO(&g_tAllSet);
403
404                         continue;
405                 }
406
407                 if (FD_ISSET(g_iListenFd, &tReadSet)) {
408                         /*  A new client is trying to connect.  */
409                         tCliLen = sizeof(sCliAddr);
410                         iConnFd = accept(g_iListenFd,
411                                          (struct sockaddr *) &sCliAddr,
412                                          &tCliLen);
413                         if (iConnFd < 0) {
414                                 /*  Cannot accept new connection...
415                                  * just ignore.
416                                  */
417                                 if (errno != EWOULDBLOCK)
418                                         PERROR("Error on accept(), errno = %d",
419                                                errno);
420                         } else {
421                                 /* Try to see if we have an agent
422                                  * with the same address, since
423                                  * agents can have multiple tcp
424                                  * connections open
425                                  */
426                                 agent = find_create_agent_blk_by_addr(&sCliAddr);
427                                 if (!agent) {
428                                         /*  Cannot support more clients...just ignore.  */
429                                         PERROR("Cannot accept more clients");
430                                         closeTcpConnection(iConnFd);
431                                 } else {
432                                         int iOption, iFlags;
433
434                                         rc = complete_agent_connection(agent,
435                                                                        iConnFd);
436                                         if (rc != EN_LUTF_RC_OK) {
437                                                 release_agent_blk(agent, true);
438                                                 continue;
439                                         }
440
441                                         /* all nodes listen on the
442                                          * same port
443                                          */
444                                         agent->listen_port = info->listen_port;
445
446                                         /*  Add new client to our select mask.  */
447                                         FD_SET(iConnFd, &g_tAllSet);
448                                         g_iMaxSelectFd = get_highest_fd();
449
450                                         /* Ok, it seems that the connected socket gains
451                                          * the same flags as the listen socket.  We want
452                                          * to make it blocking here.
453                                          */
454                                         iFlags = fcntl(iConnFd, F_GETFL, 0);
455                                         fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
456
457                                         /*  And, we want to turn off Nagle's algorithm to
458                                          *  reduce latency
459                                          */
460                                         iOption = 1;
461                                         setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
462                                                    (void *)&iOption,
463                                                    sizeof(iOption));
464
465                                         PDEBUG("Received a connection from %s on FD %d\n",
466                                                inet_ntoa(agent->addr.sin_addr), iConnFd);
467                                 }
468                         }
469
470                         /*  See if there are other messages waiting.  */
471                         iNReady--;
472                 }
473
474                 /* need to iterate through the clients and see if a
475                  * message was sent to any of them
476                  */
477                 for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
478                         /* reset the return code to avoid misbehaving on previous
479                          * returns
480                          */
481                         rc = EN_LUTF_RC_OK;
482
483                         if ((agent = find_agent_blk_by_id(i))) {
484                                 int hb_fd = INVALID_TCP_SOCKET;
485                                 int rpc_fd = INVALID_TCP_SOCKET;
486
487                                 release_agent_blk(agent, false);
488
489                                 if (FD_ISSET(agent->iFileDesc, &tReadSet))
490                                         hb_fd = agent->iFileDesc;
491                                 if (FD_ISSET(agent->iRpcFd, &tReadSet))
492                                         rpc_fd = agent->iRpcFd;
493
494                                 if (hb_fd == INVALID_TCP_SOCKET &&
495                                     rpc_fd == INVALID_TCP_SOCKET)
496                                         continue;
497
498                                 /* process heart beat */
499                                 if (hb_fd != INVALID_TCP_SOCKET) {
500                                         /* process the message */
501                                         rc = process_agent_message(agent, hb_fd);
502                                         if (rc)
503                                                 PERROR("msg failure: %s",
504                                                        lutf_rc2str(rc));
505                                 }
506                                 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
507                                         if (agent->id == master->id) {
508                                                 PERROR("Disconnected from master. Will attempt to reconnect");
509                                                 master_connected = false;
510                                         }
511                                         release_agent_blk(agent, true);
512                                         continue;
513                                 }
514
515                                 /* process rpc */
516                                 if (rpc_fd != INVALID_TCP_SOCKET) {
517                                         /* process the message */
518                                         rc = process_agent_message(agent, rpc_fd);
519                                         if (rc)
520                                                 PERROR("msg failure: %s",
521                                                        lutf_rc2str(rc));
522                                 }
523                                 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
524                                         if (agent->id == master->id) {
525                                                 PERROR("Disconnected from master. Will attempt to reconnect");
526                                                 master_connected = false;
527                                         }
528                                         release_agent_blk(agent, true);
529                                         continue;
530                                 }
531                         }
532                 }
533
534                 /* establish connection with the master if I'm an agent
535                  * and I have not connected to the master yet.
536                  * Otherwise send a heart beat
537                  */
538                 if (!master_connected &&
539                     strlen(g_lutf_cfg.master_name) != 0) {
540                         PDEBUG("Attempting a connection on master %s",
541                                g_lutf_cfg.master_name);
542                         master = find_free_agent_blk(&info->hb_info.master_address);
543                         if (!master) {
544                                 PERROR("Failed to allocate agent block");
545                                 continue;
546                         }
547
548                         iConnFd = establishTCPConnection(info->hb_info.master_address.sin_addr.s_addr,
549                                                          htons(info->hb_info.master_address.sin_port),
550                                                          true, false);
551                         if (iConnFd < 0) {
552                                 PERROR("establishTCPConnection failure: %s. Clearing set",
553                                         lutf_rc2str(iConnFd));
554                                 release_agent_blk(master, true);
555                                 PERROR("Disconnected from master. Will attempt to reconnect");
556                                 master_connected = false;
557                                 continue;
558                         }
559
560                         master->iFileDesc = iConnFd;
561                         memcpy(&master->addr,
562                                &info->hb_info.master_address,
563                                sizeof(master->addr));
564                         if (g_lutf_cfg.master_name) {
565                                 strncpy(master->name, g_lutf_cfg.master_name,
566                                         MAX_STR_LEN);
567                                 master->name[MAX_STR_LEN-1] = '\0';
568                         }
569                         master->node_type = EN_LUTF_MASTER;
570                         gethostname(master->hostname, MAX_STR_LEN);
571                         master->telnet_port = info->hb_info.agent_telnet_port;
572
573                         PDEBUG("Connected to master %s on fd %d",
574                                master->name, master->iFileDesc);
575
576                         /*
577                          * add the master FD to the select FD set
578                          * to be able to process master messages
579                          */
580                         FD_SET(iConnFd, &g_tAllSet);
581                         g_iMaxSelectFd = get_highest_fd();
582
583                         master_connected = true;
584                         master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
585                 }
586                 /*
587                 if (info->type == EN_LUTF_AGENT) {
588                         rc = send_hb(master, info->hb_info.node_name,
589                                      info->hb_info.agent_telnet_port,
590                                      info->type);
591                         if (rc != EN_LUTF_RC_OK) {
592                                 master_connected = false;
593                                 g_iMaxSelectFd = get_highest_fd();
594                         }
595                 }
596                 */
597                 /*
598                  * Get the time stamp and go through each agent
599                  * and see if it's still healthy.  For agents which
600                  * aren't healthy move off to the dead_list.
601                  * This operation is only valid if I'm a master
602                  */
603                 gettimeofday(&time_2, NULL);
604                 if (agent_get_hb() && info->type == EN_LUTF_MASTER) {
605                         /* check if HB_TO seconds has passed since the last
606                          * time we collected the time
607                          */
608                         if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
609                                 /* do the heartbeat check */
610                                 agent_hb_check(&time_1, info->type);
611                         }
612                 }
613
614                 if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
615                         lutf_agent_blk_t *agent = NULL;
616                         int idx = 0;
617
618                         do {
619                                 idx = get_next_active_agent(idx, &agent);
620                                 /* A master doesn't send a heart
621                                  * beat to himself
622                                  */
623                                 if (agent) {
624                                         bool dead = false;
625                                         if (info->type == EN_LUTF_MASTER &&
626                                             agent->id == master->id)
627                                                 continue;
628
629                                         rc = send_hb(agent, info->hb_info.node_name,
630                                                      info->hb_info.agent_telnet_port,
631                                                      info->type);
632                                         if (rc != EN_LUTF_RC_OK) {
633                                                 if (agent->id == master->id) {
634                                                         PERROR("Disconnected from master. Will attempt to reconnect");
635                                                         master_connected = false;
636                                                 }
637                                                 dead = true;
638                                         }
639                                         release_agent_blk(agent, dead);
640                                 }
641                         } while (agent);
642                 }
643
644                 /* store the current time */
645                 memcpy(&time_1, &time_2, sizeof(time_1));
646         }
647
648         /* Zero out the g_tAllSet */
649         FD_ZERO(&g_tAllSet);
650
651         return NULL;
652 }