Whamcloud - gitweb
LU-14093 lutf: fix build with gcc10
[fs/lustre-release.git] / lustre / tests / lutf / src / lutf_listener.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <errno.h>
5 #include <fcntl.h>
6 #include <sys/socket.h>
7 #include <netinet/in.h>
8 #include <netinet/tcp.h>
9 #include <arpa/inet.h>
10 #include <sys/time.h>
11 #include <pthread.h>
12 #include <string.h>
13 #include <signal.h>
14 #include "lutf.h"
15 #include "lutf_python.h"
16 #include "lutf_agent.h"
17 #include "lutf_message.h"
18 #include "lutf_listener.h"
19
20 static fd_set g_tAllSet;
21 static bool g_bShutdown;
22 static int g_iListenFd = INVALID_TCP_SOCKET;
23 static int g_iMaxSelectFd = INVALID_TCP_SOCKET;
24
25 typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
26
27 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent);
28 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent);
29 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent);
30
31 static msg_process_fn_t msg_process_tbl[EN_MSG_TYPE_MAX] = {
32         [EN_MSG_TYPE_HB] = process_msg_hb,
33         [EN_MSG_TYPE_GET_NUM_AGENTS] = process_msg_get_num_agents,
34         [EN_MSG_TYPE_RPC_REQUEST] = process_msg_rpc_request,
35 };
36
37 void lutf_listener_shutdown(void)
38 {
39         g_bShutdown = true;
40 }
41
42 int get_highest_fd(void)
43 {
44         int iAgentFd = lutf_agent_get_highest_fd();
45         int iMaxFd;
46
47         if (iAgentFd > g_iListenFd)
48                 iMaxFd = iAgentFd;
49         else
50                 iMaxFd = g_iListenFd;
51         PDEBUG("Current highest FD = %d", iMaxFd);
52
53         return iMaxFd;
54 }
55
56 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent)
57 {
58         lutf_rc_t rc;
59
60         agent->state |= LUTF_AGENT_WORK_IN_PROGRESS;
61         rc = python_handle_rpc_request(msg);
62         agent->state &= ~LUTF_AGENT_WORK_IN_PROGRESS;
63
64         return rc;
65 }
66
67 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent)
68 {
69         lutf_msg_hb_t *hb = (lutf_msg_hb_t *)msg;
70         //PERROR("Procesing HB message");
71
72         /* endian convert message */
73         hb->telnet_port = ntohl(hb->telnet_port);
74         hb->node_type = ntohl(hb->node_type);
75
76         /* update the agent with the information */
77         agent->telnet_port = hb->telnet_port;
78         agent->node_type = hb->node_type;
79         strncpy(agent->hostname, hb->node_hostname, MAX_STR_LEN);
80         agent->hostname[MAX_STR_LEN-1] = '\0';
81         strncpy(agent->name, hb->node_name, MAX_STR_LEN);
82         agent->name[MAX_STR_LEN-1] = '\0';
83         gettimeofday(&agent->time_stamp, NULL);
84
85         return EN_LUTF_RC_OK;
86 }
87
88 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent)
89 {
90         lutf_rc_t rc;
91         lutf_msg_num_agents_query_t query;
92
93         query.num_agents = get_num_agents();
94         rc = sendTcpMessage(agent->iFileDesc, (char *)&query, sizeof(query));
95         if (rc) {
96                 PERROR("failed to send tcp message to get num agents query");
97                 return rc;
98         }
99
100         return EN_LUTF_RC_OK;
101 }
102
103 static lutf_rc_t process_agent_message(lutf_agent_blk_t *agent, int fd)
104 {
105         lutf_rc_t rc = EN_LUTF_RC_OK;
106         lutf_message_hdr_t hdr;
107         char *buffer;
108         msg_process_fn_t proc_fn;
109
110         /* get the header first */
111         rc = readTcpMessage(fd, (char *)&hdr, sizeof(hdr),
112                             TCP_READ_TIMEOUT_SEC);
113
114         if (rc)
115                 return rc;
116
117         hdr.version = ntohl(hdr.version);
118         if (hdr.version != LUTF_VERSION_NUMBER) {
119                 PERROR("version %d != %d", hdr.version,
120                        LUTF_VERSION_NUMBER);
121                 return EN_LUTF_RC_BAD_VERSION;
122         }
123
124         /* if the ips don't match ignore the message */
125         if (memcmp(&agent->addr.sin_addr, &hdr.ip, sizeof(hdr.ip)))
126                 return rc;
127
128         hdr.type = ntohl(hdr.type);
129         hdr.len = ntohl(hdr.len);
130
131         buffer = calloc(hdr.len, 1);
132         if (!buffer)
133                 return EN_LUTF_RC_OOM;
134
135         /* get the rest of the message */
136         rc = readTcpMessage(fd, buffer, hdr.len,
137                             TCP_READ_TIMEOUT_SEC);
138
139         if (rc) {
140                 free(buffer);
141                 return rc;
142         }
143
144         /* call the appropriate processing function */
145         proc_fn = msg_process_tbl[hdr.type];
146         if (proc_fn)
147                 rc = proc_fn(buffer, agent);
148
149         free(buffer);
150         return rc;
151 }
152
153 static lutf_rc_t init_comm(unsigned short server_port)
154 {
155         int iFlags;
156         struct sockaddr_in sServAddr;
157
158         signal(SIGPIPE, SIG_IGN);
159
160         /*  Create a socket to listen to.  */
161         g_iListenFd = socket(AF_INET, SOCK_STREAM, 0);
162         if (g_iListenFd < 0) {
163                 /*  Cannot create a listening socket.  */
164                 return EN_LUTF_RC_SOCKET_FAIL;
165         }
166
167         /* Set a socket option which will allow us to be quickly restarted
168          * if necessary.
169          */
170         iFlags = 1;
171         if (setsockopt(g_iListenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &iFlags,
172                        sizeof(iFlags)) < 0) {
173                 /*  Cannot change the socket options.  */
174                 closeTcpConnection(g_iListenFd);
175                 return EN_LUTF_RC_FAIL;
176         }
177
178         /*  Bind to our listening socket.  */
179         bzero((char *) &sServAddr, sizeof(sServAddr));
180         sServAddr.sin_family = AF_INET;
181         sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
182         sServAddr.sin_port = htons(server_port);
183
184         if (bind(g_iListenFd, (struct sockaddr *) &sServAddr,
185                  sizeof(sServAddr)) < 0) {
186                 /*  Cannot bind our listening socket.  */
187                 closeTcpConnection(g_iListenFd);
188                 return EN_LUTF_RC_BIND_FAILED;
189         }
190
191         /* Let the system know we wish to listen to this port for
192          * connections.
193          */
194         if (listen(g_iListenFd, 2) < 0) {
195                 /*  Cannot listen to socket, close and fail  */
196                 closeTcpConnection(g_iListenFd);
197                 return EN_LUTF_RC_LISTEN_FAILED;
198         }
199
200         /* We want this socket to be non-blocking even though it will be used
201          * in a blocking select call. This is to avoid a problem identified by
202          * Richard Stevens.
203          */
204         iFlags = fcntl(g_iListenFd, F_GETFL, 0);
205         fcntl(g_iListenFd, F_SETFL, iFlags | O_NONBLOCK);
206
207         /*  Add the listening socket to our select() mask.  */
208         FD_ZERO(&g_tAllSet);
209         FD_SET(g_iListenFd, &g_tAllSet);
210
211         return EN_LUTF_RC_OK;
212 }
213
214 lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
215                   int type)
216 {
217         lutf_msg_hb_t hb;
218         int rc;
219
220         hb.telnet_port = htonl(telnet_port);
221         hb.node_type = htonl(type);
222         strncpy(hb.node_name, name, MAX_STR_LEN);
223         hb.node_name[MAX_STR_LEN-1] = '\0';
224         gethostname(hb.node_hostname, MAX_STR_LEN);
225
226         /* send the heart beat */
227         rc = lutf_send_msg(agent->iFileDesc, (char *)&hb,
228                            sizeof(hb), EN_MSG_TYPE_HB);
229         if (rc != EN_LUTF_RC_OK) {
230                 PERROR("Failed to send heart beat %s\n",
231                         lutf_rc2str(rc));
232         }
233
234         return rc;
235 }
236
237 lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
238 {
239         /* we assume the first connection is an HB connection */
240         if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
241                 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
242                         PERROR("agent in unexpected state. state is %s, but HB FD is %d",
243                                agent_state2str(agent), fd);
244                         return EN_LUTF_RC_SYS_ERR;
245                 } else {
246                         PDEBUG("HB Channel Connected: %d", fd);
247                         agent->iFileDesc = fd;
248                         agent->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
249                         return EN_LUTF_RC_OK;
250                 }
251         } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
252                 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
253                         PERROR("agent in unexpected state. state is %s, but RPC FD is %d",
254                                agent_state2str(agent), fd);
255                         return EN_LUTF_RC_SYS_ERR;
256                 } else {
257                         PDEBUG("RPC Channel Connected: %d", fd);
258                         agent->iRpcFd = fd;
259                         agent->state |= LUTF_AGENT_RPC_CHANNEL_CONNECTED;
260                         return EN_LUTF_RC_OK;
261                 }
262         }
263
264         PERROR("agent is in an unexpected state on connection %s",
265                agent_state2str(agent));
266         return EN_LUTF_RC_SYS_ERR;
267 }
268
269 void close_agent_connection(lutf_agent_blk_t *agent)
270 {
271         if (agent->iFileDesc != INVALID_TCP_SOCKET) {
272                 FD_CLR(agent->iFileDesc, &g_tAllSet);
273                 closeTcpConnection(agent->iFileDesc);
274                 agent->iFileDesc = -1;
275         }
276         if (agent->iRpcFd != INVALID_TCP_SOCKET) {
277                 FD_CLR(agent->iRpcFd, &g_tAllSet);
278                 closeTcpConnection(agent->iRpcFd);
279                 agent->iRpcFd = -1;
280         }
281         g_iMaxSelectFd = get_highest_fd();
282 }
283
284 void agent_hb_check(struct timeval *t, lutf_type_t me)
285 {
286         lutf_agent_blk_t *agent;
287         int i = 0;
288
289         for (i = 0; i < MAX_NUM_AGENTS; i++) {
290                 agent = find_agent_blk_by_id(i);
291
292                 if (agent && agent->node_type != me) {
293                         if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
294                                 /* agent didn't send a HB move to dead
295                                  * list
296                                  */
297                                 PERROR("agent %s presumed dead", agent->name);
298                                 release_agent_blk(agent, true);
299                                 continue;
300                         }
301                 }
302                 if (agent)
303                         release_agent_blk(agent, false);
304         }
305 }
306
307 /*
308  * lutf_listener_main
309  *   main loop.  Listens for incoming agent connections, and for agent
310  *   messages.  Every period of time it triggers a walk through the agent
311  *   list to see if any of the HBs stopped
312  *
313  *   If I am an Agent, then attempt to connect to the master and add an
314  *   agent block on the list of agents. After successful connection send
315  *   a regular heart beat.
316  *
317  *   Since the master's agent block is on the list of agents and its FD is
318  *   on the select FD set, then if the master sends the agent a message
319  *   the agent should be able to process it.
320  */
321 void *lutf_listener_main(void *usr_data)
322 {
323         int iConnFd;
324         struct sockaddr_in sCliAddr;
325         socklen_t  tCliLen;
326         fd_set tReadSet;
327         int iNReady;
328         int i;
329         lutf_rc_t rc;
330         lutf_agent_blk_t *agent = NULL, *master = NULL;
331         struct timeval time_1, time_2, select_to;
332         lutf_listener_info_t *info;
333         bool master_connected = false;
334
335         info = (lutf_listener_info_t *)usr_data;
336         if ((!info) ||
337             ((info) && (info->listen_port == 0))) {
338                 PERROR("No liston port provided");
339                 return NULL;
340         }
341
342         rc = init_comm(info->listen_port);
343         if (rc) {
344                 PERROR("init_comm failed: %s", lutf_rc2str(rc));
345                 return NULL;
346         }
347
348         agent_init();
349
350         g_iMaxSelectFd = g_iListenFd;
351
352         gettimeofday(&time_2, NULL);
353
354         /*  Main Processing Loop: Keep going until we have reason
355          * to shutdown.
356          */
357         while (!g_bShutdown) {
358                 /*  Wait on our select mask for an event to occur.  */
359                 select_to.tv_sec = HB_TO;
360                 select_to.tv_usec = 0;
361
362                 FD_ZERO(&tReadSet);
363                 tReadSet = g_tAllSet;
364                 iNReady = select(g_iMaxSelectFd + 1, &tReadSet, NULL, NULL,
365                                  &select_to);
366
367                 release_dead_list_agents();
368
369                 /*  Determine if we failed the select call */
370                 if (iNReady < 0) {
371                         /*  Check to see if we were interrupted by a signal.  */
372                         if ((errno == EINTR) || (errno == EAGAIN)) {
373                                 PERROR("Select failure: errno = %d", errno);
374                         } else if (errno != ECONNABORTED) {
375                                 /* If this is an ECONNABORTED error, just
376                                  * ignore it. Raise a fatal alarm and shut
377                                  * down.
378                                  */
379                                 PERROR("Shutting down Listener thread. errno: %d",
380                                        errno);
381                                 lutf_listener_shutdown();
382                         }
383
384                         /* store the current time */
385                         time_1 = time_2;
386
387                         /* Zero out the g_tAllSet */
388                         FD_ZERO(&g_tAllSet);
389
390                         continue;
391                 }
392
393                 if (FD_ISSET(g_iListenFd, &tReadSet)) {
394                         /*  A new client is trying to connect.  */
395                         tCliLen = sizeof(sCliAddr);
396                         iConnFd = accept(g_iListenFd,
397                                          (struct sockaddr *) &sCliAddr,
398                                          &tCliLen);
399                         if (iConnFd < 0) {
400                                 /*  Cannot accept new connection...
401                                  * just ignore.
402                                  */
403                                 if (errno != EWOULDBLOCK)
404                                         PERROR("Error on accept(), errno = %d",
405                                                errno);
406                         } else {
407                                 /* Try to see if we have an agent
408                                  * with the same address, since
409                                  * agents can have multiple tcp
410                                  * connections open
411                                  */
412                                 agent = find_create_agent_blk_by_addr(&sCliAddr);
413                                 if (!agent) {
414                                         /*  Cannot support more clients...just ignore.  */
415                                         PERROR("Cannot accept more clients");
416                                         closeTcpConnection(iConnFd);
417                                 } else {
418                                         int iOption, iFlags;
419
420                                         rc = complete_agent_connection(agent,
421                                                                        iConnFd);
422                                         if (rc != EN_LUTF_RC_OK) {
423                                                 release_agent_blk(agent, true);
424                                                 continue;
425                                         }
426
427                                         /* all nodes listen on the
428                                          * same port
429                                          */
430                                         agent->listen_port = info->listen_port;
431
432                                         /*  Add new client to our select mask.  */
433                                         FD_SET(iConnFd, &g_tAllSet);
434                                         g_iMaxSelectFd = get_highest_fd();
435
436                                         /* Ok, it seems that the connected socket gains
437                                          * the same flags as the listen socket.  We want
438                                          * to make it blocking here.
439                                          */
440                                         iFlags = fcntl(iConnFd, F_GETFL, 0);
441                                         fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
442
443                                         /*  And, we want to turn off Nagle's algorithm to
444                                          *  reduce latency
445                                          */
446                                         iOption = 1;
447                                         setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
448                                                    (void *)&iOption,
449                                                    sizeof(iOption));
450
451                                         PDEBUG("Received a connection from %s on FD %d\n",
452                                                inet_ntoa(agent->addr.sin_addr), iConnFd);
453                                 }
454                         }
455
456                         /*  See if there are other messages waiting.  */
457                         iNReady--;
458                 }
459
460                 /* need to iterate through the clients and see if a
461                  * message was sent to any of them
462                  */
463                 for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
464                         /* reset the return code to avoid misbehaving on previous
465                          * returns
466                          */
467                         rc = EN_LUTF_RC_OK;
468
469                         if ((agent = find_agent_blk_by_id(i))) {
470                                 int hb_fd = INVALID_TCP_SOCKET;
471                                 int rpc_fd = INVALID_TCP_SOCKET;
472
473                                 release_agent_blk(agent, false);
474
475                                 if (FD_ISSET(agent->iFileDesc, &tReadSet))
476                                         hb_fd = agent->iFileDesc;
477                                 if (FD_ISSET(agent->iRpcFd, &tReadSet))
478                                         rpc_fd = agent->iRpcFd;
479
480                                 if (hb_fd == INVALID_TCP_SOCKET &&
481                                     rpc_fd == INVALID_TCP_SOCKET)
482                                         continue;
483
484                                 /* process heart beat */
485                                 if (hb_fd != INVALID_TCP_SOCKET) {
486                                         /* process the message */
487                                         rc = process_agent_message(agent, hb_fd);
488                                         if (rc)
489                                                 PERROR("msg failure: %s",
490                                                        lutf_rc2str(rc));
491                                 }
492                                 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
493                                         if (agent->id == master->id) {
494                                                 PERROR("Disconnected from master. Will attempt to reconnect");
495                                                 master_connected = false;
496                                         }
497                                         release_agent_blk(agent, true);
498                                         continue;
499                                 }
500
501                                 /* process rpc */
502                                 if (rpc_fd != INVALID_TCP_SOCKET) {
503                                         /* process the message */
504                                         rc = process_agent_message(agent, rpc_fd);
505                                         if (rc)
506                                                 PERROR("msg failure: %s",
507                                                        lutf_rc2str(rc));
508                                 }
509                                 if (rc == EN_LUTF_RC_SOCKET_FAIL) {
510                                         if (agent->id == master->id) {
511                                                 PERROR("Disconnected from master. Will attempt to reconnect");
512                                                 master_connected = false;
513                                         }
514                                         release_agent_blk(agent, true);
515                                         continue;
516                                 }
517                         }
518                 }
519
520                 /* establish connection with the master if I'm an agent
521                  * and I have not connected to the master yet.
522                  * Otherwise send a heart beat
523                  */
524                 if (!master_connected &&
525                     strlen(g_lutf_cfg.master_name) != 0) {
526                         PDEBUG("Attempting a connection on master %s",
527                                g_lutf_cfg.master_name);
528                         master = find_free_agent_blk(&info->hb_info.master_address);
529                         if (!master) {
530                                 PERROR("Failed to allocate agent block");
531                                 continue;
532                         }
533
534                         iConnFd = establishTCPConnection(info->hb_info.master_address.sin_addr.s_addr,
535                                                          htons(info->hb_info.master_address.sin_port),
536                                                          true, false);
537                         if (iConnFd < 0) {
538                                 PERROR("establishTCPConnection failure: %s. Clearing set",
539                                         lutf_rc2str(iConnFd));
540                                 release_agent_blk(master, true);
541                                 PERROR("Disconnected from master. Will attempt to reconnect");
542                                 master_connected = false;
543                                 continue;
544                         }
545
546                         master->iFileDesc = iConnFd;
547                         memcpy(&master->addr,
548                                &info->hb_info.master_address,
549                                sizeof(master->addr));
550                         if (g_lutf_cfg.master_name) {
551                                 strncpy(master->name, g_lutf_cfg.master_name,
552                                         MAX_STR_LEN);
553                                 master->name[MAX_STR_LEN-1] = '\0';
554                         }
555                         master->node_type = EN_LUTF_MASTER;
556                         gethostname(master->hostname, MAX_STR_LEN);
557                         master->telnet_port = info->hb_info.agent_telnet_port;
558
559                         PDEBUG("Connected to master %s on fd %d",
560                                master->name, master->iFileDesc);
561
562                         /*
563                          * add the master FD to the select FD set
564                          * to be able to process master messages
565                          */
566                         FD_SET(iConnFd, &g_tAllSet);
567                         g_iMaxSelectFd = get_highest_fd();
568
569                         master_connected = true;
570                         master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
571                 }
572                 /*
573                 if (info->type == EN_LUTF_AGENT) {
574                         rc = send_hb(master, info->hb_info.node_name,
575                                      info->hb_info.agent_telnet_port,
576                                      info->type);
577                         if (rc != EN_LUTF_RC_OK) {
578                                 master_connected = false;
579                                 g_iMaxSelectFd = get_highest_fd();
580                         }
581                 }
582                 */
583                 /*
584                  * Get the time stamp and go through each agent
585                  * and see if it's still healthy.  For agents which
586                  * aren't healthy move off to the dead_list.
587                  * This operation is only valid if I'm a master
588                  */
589                 gettimeofday(&time_2, NULL);
590                 if (agent_get_hb() && info->type == EN_LUTF_MASTER) {
591                         /* check if HB_TO seconds has passed since the last
592                          * time we collected the time
593                          */
594                         if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
595                                 /* do the heartbeat check */
596                                 agent_hb_check(&time_1, info->type);
597                         }
598                 }
599
600                 if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
601                         lutf_agent_blk_t *agent = NULL;
602                         int idx = 0;
603
604                         do {
605                                 idx = get_next_active_agent(idx, &agent);
606                                 /* A master doesn't send a heart
607                                  * beat to himself
608                                  */
609                                 if (agent) {
610                                         bool dead = false;
611                                         if (info->type == EN_LUTF_MASTER &&
612                                             agent->id == master->id)
613                                                 continue;
614
615                                         rc = send_hb(agent, info->hb_info.node_name,
616                                                      info->hb_info.agent_telnet_port,
617                                                      info->type);
618                                         if (rc != EN_LUTF_RC_OK) {
619                                                 if (agent->id == master->id) {
620                                                         PERROR("Disconnected from master. Will attempt to reconnect");
621                                                         master_connected = false;
622                                                 }
623                                                 dead = true;
624                                         }
625                                         release_agent_blk(agent, dead);
626                                 }
627                         } while (agent);
628                 }
629
630                 /* store the current time */
631                 memcpy(&time_1, &time_2, sizeof(time_1));
632         }
633
634         /* Zero out the g_tAllSet */
635         FD_ZERO(&g_tAllSet);
636
637         return NULL;
638 }