Whamcloud - gitweb
LU-10973 lnet: initial LUTF C infrastructure
[fs/lustre-release.git] / lustre / tests / lutf / src / lutf_listener.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <errno.h>
5 #include <fcntl.h>
6 #include <sys/socket.h>
7 #include <netinet/in.h>
8 #include <netinet/tcp.h>
9 #include <arpa/inet.h>
10 #include <sys/time.h>
11 #include <pthread.h>
12 #include <string.h>
13 #include <signal.h>
14 #include "lutf.h"
15 #include "lutf_python.h"
16 #include "lutf_agent.h"
17 #include "lutf_message.h"
18 #include "lutf_listener.h"
19
20 static fd_set g_tAllSet;
21 static bool g_bShutdown = false;
22 static int g_iListenFd = INVALID_TCP_SOCKET;
23 bool g_agent_enable_hb = true;
24
25 typedef lutf_rc_t (*msg_process_fn_t)(char *msg, lutf_agent_blk_t *agent);
26
27 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent);
28 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent);
29 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent);
30
31 static msg_process_fn_t msg_process_tbl[EN_MSG_TYPE_MAX] = {
32         [EN_MSG_TYPE_HB] = process_msg_hb,
33         [EN_MSG_TYPE_GET_NUM_AGENTS] = process_msg_get_num_agents,
34         [EN_MSG_TYPE_RPC_REQUEST] = process_msg_rpc_request,
35 };
36
37 void lutf_listener_shutdown(void)
38 {
39         g_bShutdown = true;
40 }
41
42 int get_highest_fd(void)
43 {
44         int iAgentFd = lutf_agent_get_highest_fd();
45         int iMaxFd;
46
47         if (iAgentFd > g_iListenFd)
48                 iMaxFd = iAgentFd;
49         else
50                 iMaxFd = g_iListenFd;
51         PDEBUG("Current highest FD = %d", iMaxFd);
52
53         return iMaxFd;
54 }
55
56 static lutf_rc_t process_msg_rpc_request(char *msg, lutf_agent_blk_t *agent)
57 {
58         lutf_rc_t rc;
59
60         agent->state |= LUTF_AGENT_WORK_IN_PROGRESS;
61         rc = python_handle_rpc_request(msg);
62         agent->state &= ~LUTF_AGENT_WORK_IN_PROGRESS;
63
64         return rc;
65 }
66
67 static lutf_rc_t process_msg_hb(char *msg, lutf_agent_blk_t *agent)
68 {
69         lutf_msg_hb_t *hb = (lutf_msg_hb_t *)msg;
70         //PERROR("Procesing HB message");
71
72         /* endian convert message */
73         hb->telnet_port = ntohl(hb->telnet_port);
74         hb->node_type = ntohl(hb->node_type);
75
76         /* update the agent with the information */
77         agent->telnet_port = hb->telnet_port;
78         agent->node_type = hb->node_type;
79         strncpy(agent->hostname, hb->node_hostname, MAX_STR_LEN);
80         agent->hostname[MAX_STR_LEN-1] = '\0';
81         strncpy(agent->name, hb->node_name, MAX_STR_LEN);
82         agent->name[MAX_STR_LEN-1] = '\0';
83         gettimeofday(&agent->time_stamp, NULL);
84
85         return EN_LUTF_RC_OK;
86 }
87
88 static lutf_rc_t process_msg_get_num_agents(char *msg, lutf_agent_blk_t *agent)
89 {
90         lutf_rc_t rc;
91         lutf_msg_num_agents_query_t query;
92
93         query.num_agents = get_num_agents();
94         rc = sendTcpMessage(agent->iFileDesc, (char *)&query, sizeof(query));
95         if (rc) {
96                 PERROR("failed to send tcp message to get num agents query");
97                 return rc;
98         }
99
100         return EN_LUTF_RC_OK;
101 }
102
103 static lutf_rc_t process_agent_message(lutf_agent_blk_t *agent, int fd)
104 {
105         lutf_rc_t rc = EN_LUTF_RC_OK;
106         lutf_message_hdr_t hdr;
107         char *buffer;
108         msg_process_fn_t proc_fn;
109
110         /* get the header first */
111         rc = readTcpMessage(fd, (char *)&hdr, sizeof(hdr),
112                             TCP_READ_TIMEOUT_SEC);
113
114         if (rc)
115                 return rc;
116
117         hdr.version = ntohl(hdr.version);
118         if (hdr.version != LUTF_VERSION_NUMBER) {
119                 PERROR("version %d != %d", hdr.version,
120                        LUTF_VERSION_NUMBER);
121                 return EN_LUTF_RC_BAD_VERSION;
122         }
123
124         /* if the ips don't match ignore the message */
125         if (memcmp(&agent->addr.sin_addr, &hdr.ip, sizeof(hdr.ip)))
126                 return rc;
127
128         hdr.type = ntohl(hdr.type);
129         hdr.len = ntohl(hdr.len);
130
131         buffer = calloc(hdr.len, 1);
132         if (!buffer)
133                 return EN_LUTF_RC_OOM;
134
135         /* get the rest of the message */
136         rc = readTcpMessage(fd, buffer, hdr.len,
137                             TCP_READ_TIMEOUT_SEC);
138
139         if (rc) {
140                 free(buffer);
141                 return rc;
142         }
143
144         /* call the appropriate processing function */
145         proc_fn = msg_process_tbl[hdr.type];
146         if (proc_fn)
147                 rc = proc_fn(buffer, agent);
148
149         free(buffer);
150         return rc;
151 }
152
153 static lutf_rc_t init_comm(unsigned short server_port)
154 {
155         int iFlags;
156         struct sockaddr_in sServAddr;
157
158         signal(SIGPIPE, SIG_IGN);
159
160         /*  Create a socket to listen to.  */
161         g_iListenFd = socket(AF_INET, SOCK_STREAM, 0);
162         if (g_iListenFd < 0) {
163                 /*  Cannot create a listening socket.  */
164                 return EN_LUTF_RC_SOCKET_FAIL;
165         }
166
167         /* Set a socket option which will allow us to be quickly restarted
168          * if necessary.
169          */
170         iFlags = 1;
171         if (setsockopt(g_iListenFd, SOL_SOCKET, SO_REUSEADDR, (void *) &iFlags,
172                        sizeof(iFlags)) < 0) {
173                 /*  Cannot change the socket options.  */
174                 closeTcpConnection(g_iListenFd);
175                 return EN_LUTF_RC_FAIL;
176         }
177
178         /*  Bind to our listening socket.  */
179         bzero((char *) &sServAddr, sizeof(sServAddr));
180         sServAddr.sin_family = AF_INET;
181         sServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
182         sServAddr.sin_port = htons(server_port);
183
184         if (bind(g_iListenFd, (struct sockaddr *) &sServAddr, sizeof(sServAddr)) < 0) {
185                 /*  Cannot bind our listening socket.  */
186                 closeTcpConnection(g_iListenFd);
187                 return EN_LUTF_RC_BIND_FAILED;
188         }
189
190         /*  Let the system know we wish to listen to this port for
191          *  connections. */
192         if (listen(g_iListenFd, 2) < 0) {
193                 /*  Cannot listen to socket, close and fail  */
194                 closeTcpConnection(g_iListenFd);
195                 return EN_LUTF_RC_LISTEN_FAILED;
196         }
197
198         /* We want this socket to be non-blocking even though it will be used
199          * in a blocking select call. This is to avoid a problem identified by
200          * Richard Stevens.
201          */
202         iFlags = fcntl(g_iListenFd, F_GETFL, 0);
203         fcntl(g_iListenFd, F_SETFL, iFlags | O_NONBLOCK);
204
205         /*  Add the listening socket to our select() mask.  */
206         FD_ZERO(&g_tAllSet);
207         FD_SET(g_iListenFd, &g_tAllSet);
208
209         return EN_LUTF_RC_OK;
210 }
211
212 static inline int close_agent_connection(lutf_agent_blk_t *agent)
213 {
214         FD_CLR(agent->iFileDesc, &g_tAllSet);
215         FD_CLR(agent->iRpcFd, &g_tAllSet);
216         closeTcpConnection(agent->iRpcFd);
217         closeTcpConnection(agent->iFileDesc);
218         agent->state &=
219                 ~LUTF_AGENT_RPC_CHANNEL_CONNECTED;
220         agent->state &=
221                 ~LUTF_AGENT_HB_CHANNEL_CONNECTED;
222         return get_highest_fd();
223 }
224
225 lutf_rc_t send_hb(lutf_agent_blk_t *agent, char *name, int telnet_port,
226                   int type)
227 {
228         lutf_msg_hb_t hb;
229         int rc;
230
231         hb.telnet_port = htonl(telnet_port);
232         hb.node_type = htonl(type);
233         strncpy(hb.node_name, name, MAX_STR_LEN);
234         hb.node_name[MAX_STR_LEN-1] = '\0';
235         gethostname(hb.node_hostname, MAX_STR_LEN);
236
237         /* send the heart beat */
238         rc = lutf_send_msg(agent->iFileDesc, (char *)&hb,
239                            sizeof(hb), EN_MSG_TYPE_HB);
240         if (rc != EN_LUTF_RC_OK) {
241                 PERROR("Failed to send heart beat %s\n",
242                         lutf_rc2str(rc));
243         }
244
245         return rc;
246 }
247
248 lutf_rc_t complete_agent_connection(lutf_agent_blk_t *agent, int fd)
249 {
250         /* we assume the first connection is an HB connection */
251         if (!(agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED)) {
252                 if (agent->iFileDesc != INVALID_TCP_SOCKET) {
253                         PERROR("agent in unexpected state. "
254                                "state is %s, but HB FD is %d",
255                                agent_state2str(agent), fd);
256                         return EN_LUTF_RC_SYS_ERR;
257                 } else {
258                         PDEBUG("HB Channel Connected: %d", fd);
259                         agent->iFileDesc = fd;
260                         agent->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
261                         return EN_LUTF_RC_OK;
262                 }
263         } else if (!(agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
264                 if (agent->iRpcFd != INVALID_TCP_SOCKET) {
265                         PERROR("agent in unexpected state. "
266                                "state is %s, but RPC FD is %d",
267                                agent_state2str(agent), fd);
268                         return EN_LUTF_RC_SYS_ERR;
269                 } else {
270                         PDEBUG("RPC Channel Connected: %d", fd);
271                         agent->iRpcFd = fd;
272                         agent->state |= LUTF_AGENT_RPC_CHANNEL_CONNECTED;
273                         return EN_LUTF_RC_OK;
274                 }
275         }
276
277         PERROR("agent is in an unexpected state on connection %s",
278                agent_state2str(agent));
279         return EN_LUTF_RC_SYS_ERR;
280 }
281
282 /*
283  * lutf_listener_main
284  *   main loop.  Listens for incoming agent connections, and for agent
285  *   messages.  Every period of time it triggers a walk through the agent
286  *   list to see if any of the HBs stopped
287  *
288  *   If I am an Agent, then attempt to connect to the master and add an
289  *   agent block on the list of agents. After successful connection send
290  *   a regular heart beat.
291  *
292  *   Since the master's agent block is on the list of agents and its FD is
293  *   on the select FD set, then if the master sends the agent a message
294  *   the agent should be able to process it.
295  */
296 void *lutf_listener_main(void *usr_data)
297 {
298         int iConnFd;
299         struct sockaddr_in sCliAddr;
300         socklen_t  tCliLen;
301         fd_set tReadSet;
302         int iNReady;
303         int iMaxSelectFd;
304         int i;
305         lutf_rc_t rc;
306         lutf_agent_blk_t *agent = NULL, *master = NULL;
307         struct timeval time_1, time_2, select_to;
308         lutf_listener_info_t *info;
309         bool master_connected = false;
310
311         info = (lutf_listener_info_t *)usr_data;
312         if ((!info) ||
313             ((info) && (info->listen_port == 0))) {
314                 PERROR("No liston port provided");
315                 return NULL;
316         }
317
318         rc = init_comm(info->listen_port);
319         if (rc) {
320                 PERROR("init_comm failed: %s", lutf_rc2str(rc));
321                 return NULL;
322         }
323
324         agent_init();
325
326         iMaxSelectFd = g_iListenFd;
327
328         gettimeofday(&time_1, NULL);
329
330
331         /*  Main Processing Loop: Keep going until we have reason to shutdown. */
332         while (!g_bShutdown) {
333                 /*  Wait on our select mask for an event to occur.  */
334                 tReadSet = g_tAllSet;
335
336                 select_to.tv_sec = HB_TO;
337                 select_to.tv_usec = 0;
338
339                 iNReady = select(iMaxSelectFd + 1, &tReadSet, NULL, NULL, &select_to);
340
341                 /*  Determine if we failed the select call */
342                 if (iNReady < 0) {
343                         /*  Check to see if we were interrupted by a signal.  */
344                         if ((errno == EINTR) || (errno == EAGAIN)) {
345                                 PERROR("Select failure: errno = %d", errno);
346                         } else {
347                                 /*  If this is an ECONNABORTED error, just ignore it.  */
348                                 if (errno != ECONNABORTED) {
349                                         /* Raise a fatal alarm.  */
350                                         /* Shut down */
351                                         PERROR("Shutting down Listener thread. errno: %d", errno);
352                                         g_bShutdown = true;
353                                 }
354                         }
355                 } else {
356                         if (FD_ISSET(g_iListenFd, &tReadSet)) {
357                                 /*  A new client is trying to connect.  */
358                                 tCliLen = sizeof(sCliAddr);
359                                 iConnFd = accept(g_iListenFd, (struct sockaddr *) &sCliAddr,
360                                                  &tCliLen);
361                                 if (iConnFd < 0) {
362                                         /*  Cannot accept new connection...just ignore.  */
363                                         if (errno != EWOULDBLOCK)
364                                                 PERROR("Error on accept(), errno = %d", errno);
365                                 } else {
366                                         /* Try to see if we have an agent
367                                          * with the same address, since
368                                          * agents can have multiple tcp
369                                          * connections open
370                                          */
371                                         agent = find_create_agent_blk_by_addr(&sCliAddr);
372                                         if (!agent) {
373                                                 /*  Cannot support more clients...just ignore.  */
374                                                 PERROR("Cannot accept more clients");
375                                                 closeTcpConnection(iConnFd);
376                                         } else {
377                                                 int iOption, iFlags;
378
379                                                 rc = complete_agent_connection(agent,
380                                                                 iConnFd);
381                                                 if (rc != EN_LUTF_RC_OK) {
382                                                         int agent_id = agent->id;
383                                                         iMaxSelectFd = close_agent_connection(agent);
384                                                         release_agent_blk(agent);
385                                                         free_agent_blk(agent_id);
386                                                         continue;
387                                                 }
388
389                                                 /* all nodes listen on the
390                                                  * same port
391                                                  */
392                                                 agent->listen_port = info->listen_port;
393
394                                                 /*  Add new client to our select mask.  */
395                                                 FD_SET(iConnFd, &g_tAllSet);
396                                                 iMaxSelectFd = get_highest_fd();
397
398                                                 /* Ok, it seems that the connected socket gains
399                                                  * the same flags as the listen socket.  We want
400                                                  * to make it blocking here.
401                                                  */
402                                                 iFlags = fcntl(iConnFd, F_GETFL, 0);
403                                                 fcntl(iConnFd, F_SETFL, iFlags & (~O_NONBLOCK));
404
405                                                 /*  And, we want to turn off Nagle's algorithm to
406                                                  *  reduce latency
407                                                  */
408                                                 iOption = 1;
409                                                 setsockopt(iConnFd, IPPROTO_TCP, TCP_NODELAY,
410                                                            (void *)&iOption,
411                                                            sizeof(iOption));
412
413                                                 PDEBUG("Received a connection from %s on FD %d\n",
414                                                        inet_ntoa(agent->addr.sin_addr), iConnFd);
415
416                                                 release_agent_blk(agent);
417                                         }
418                                 }
419
420                                 /*  See if there are other messages waiting.  */
421                                 iNReady--;
422                         }
423
424                         /* need to iterate through the clients and see if a
425                          * message was sent to any of them
426                          */
427                         for (i = 0; ((i < MAX_NUM_AGENTS) && (iNReady > 0)); i++) {
428                                 /* reset the return code to avoid misbehaving on previous
429                                  * returns
430                                  */
431                                 rc = EN_LUTF_RC_OK;
432
433                                 if ((agent = find_agent_blk_by_id(i))) {
434                                         int hb_fd = INVALID_TCP_SOCKET;
435                                         int rpc_fd = INVALID_TCP_SOCKET;
436
437                                         if (FD_ISSET(agent->iFileDesc, &tReadSet))
438                                                 hb_fd = agent->iFileDesc;
439                                         if (FD_ISSET(agent->iRpcFd, &tReadSet))
440                                                 rpc_fd = agent->iRpcFd;
441
442                                         if (hb_fd == INVALID_TCP_SOCKET &&
443                                             rpc_fd == INVALID_TCP_SOCKET)
444                                                 continue;
445
446                                         /* process heart beat */
447                                         if (hb_fd != INVALID_TCP_SOCKET) {
448                                                 /* process the message */
449                                                 rc = process_agent_message(agent, hb_fd);
450                                                 if (rc)
451                                                         PERROR("msg failure: %s",
452                                                                lutf_rc2str(rc));
453                                         }
454                                         if (rc == EN_LUTF_RC_SOCKET_FAIL) {
455                                                 int agent_id = agent->id;
456                                                 if (agent->id == master->id) {
457                                                         PERROR("Disconnected from master. Will attempt to reconnect");
458                                                         master_connected = false;
459                                                 }
460                                                 iMaxSelectFd = close_agent_connection(agent);
461                                                 release_agent_blk(agent);
462                                                 free_agent_blk(agent_id);
463                                                 continue;
464                                         }
465
466                                         /* process rpc */
467                                         if (rpc_fd != INVALID_TCP_SOCKET) {
468                                                 /* process the message */
469                                                 rc = process_agent_message(agent, rpc_fd);
470                                                 if (rc)
471                                                         PERROR("msg failure: %s",
472                                                                lutf_rc2str(rc));
473                                         }
474                                         if (rc == EN_LUTF_RC_SOCKET_FAIL) {
475                                                 int agent_id = agent->id;
476                                                 if (agent->id == master->id) {
477                                                         PERROR("Disconnected from master. Will attempt to reconnect");
478                                                         master_connected = false;
479                                                 }
480                                                 iMaxSelectFd = close_agent_connection(agent);
481                                                 release_agent_blk(agent);
482                                                 free_agent_blk(agent_id);
483                                                 continue;
484                                         }
485                                         release_agent_blk(agent);
486                                 }
487                         }
488
489                         /* establish connection with the master if I'm an agent
490                          * and I have not connected to the master yet.
491                          * Otherwise send a heart beat
492                          */
493                         if (!master_connected &&
494                             strlen(g_lutf_cfg.master_name) != 0) {
495                                 PDEBUG("Attempting a connection on master %s",
496                                        g_lutf_cfg.master_name);
497                                 master = find_free_agent_blk(&info->hb_info.master_address);
498                                 if (!master) {
499                                         PERROR("Failed to allocate agent block");
500                                         continue;
501                                 }
502
503                                 iConnFd = establishTCPConnection(
504                                         info->hb_info.master_address.sin_addr.s_addr,
505                                         htons(info->hb_info.master_address.sin_port),
506                                         true, false);
507
508                                 if (iConnFd < 0) {
509                                         int master_id = master->id;
510
511                                         PERROR("establishTCPConnection failure: %s. Clearing set",
512                                                 lutf_rc2str(iConnFd));
513                                         iMaxSelectFd = close_agent_connection(master);
514                                         release_agent_blk(master);
515                                         free_agent_blk(master_id);
516                                         PERROR("Disconnected from master. Will attempt to reconnect");
517                                         master_connected = false;
518                                         continue;
519                                 }
520
521                                 master->iFileDesc = iConnFd;
522                                 memcpy(&master->addr,
523                                        &info->hb_info.master_address,
524                                        sizeof(master->addr));
525                                 strncpy(master->name, g_lutf_cfg.master_name,
526                                         MAX_STR_LEN);
527                                 master->name[MAX_STR_LEN-1] = '\0';
528                                 master->node_type = EN_LUTF_MASTER;
529                                 gethostname(master->hostname, MAX_STR_LEN);
530                                 master->telnet_port = info->hb_info.agent_telnet_port;
531                                 release_agent_blk(master);
532
533                                 PDEBUG("Connected to master %s on fd %d",
534                                        master->name, master->iFileDesc);
535
536                                 /*
537                                  * add the master FD to the select FD set
538                                  * to be able to process master messages
539                                  */
540                                 FD_SET(iConnFd, &g_tAllSet);
541                                 iMaxSelectFd = get_highest_fd();
542
543                                 master_connected = true;
544                                 master->state |= LUTF_AGENT_HB_CHANNEL_CONNECTED;
545                         }
546 /*
547                         if (info->type == EN_LUTF_AGENT) {
548                                 rc = send_hb(master, info->hb_info.node_name,
549                                              info->hb_info.agent_telnet_port,
550                                              info->type);
551                                 if (rc != EN_LUTF_RC_OK) {
552                                         master_connected = false;
553                                         iMaxSelectFd = get_highest_fd();
554                                 }
555                         }
556 */
557                         /*
558                          * Get the time stamp and go through each agent
559                          * and see if it's still healthy.  For agents which
560                          * aren't healthy move off to the dead_list.
561                          * This operation is only valid if I'm a master
562                          */
563                         gettimeofday(&time_2, NULL);
564                         if (g_agent_enable_hb && info->type == EN_LUTF_MASTER) {
565                                 /* check if HB_TO seconds has passed since the last
566                                  * time we collected the time */
567                                 if (time_2.tv_sec - time_1.tv_sec >= HB_TO * 100) {
568
569                                         /* do the heartbeat check */
570                                         agent_hb_check(&time_1, info->type);
571                                 }
572                         }
573
574                         if (time_2.tv_sec - time_1.tv_sec >= HB_TO) {
575                                 lutf_agent_blk_t *agent = NULL;
576                                 int idx = 0;
577
578                                 do {
579                                         idx = get_next_active_agent(idx, &agent);
580                                         /* A master doesn't send a heart
581                                          * beat to himself */
582                                         if (agent) {
583                                                 if (info->type == EN_LUTF_MASTER &&
584                                                     agent->id == master->id)
585                                                         continue;
586                                                 int agent_id = agent->id;
587                                                 rc = send_hb(agent, info->hb_info.node_name,
588                                                              info->hb_info.agent_telnet_port,
589                                                              info->type);
590                                                 if (rc != EN_LUTF_RC_OK) {
591                                                         if (agent->id == master->id) {
592                                                                 PERROR("Disconnected from master. Will attempt to reconnect");
593                                                                 master_connected = false;
594                                                         }
595                                                         iMaxSelectFd = close_agent_connection(agent);
596                                                         release_agent_blk(agent);
597                                                         free_agent_blk(agent_id);
598                                                 } else {
599                                                         release_agent_blk(agent);
600                                                 }
601                                         }
602                                 } while (agent);
603                         }
604                 }
605                 /* store the current time */
606                 time_1 = time_2;
607         }
608
609         /* Zero out the g_tAllSet */
610         FD_ZERO(&g_tAllSet);
611
612         return NULL;
613 }