Whamcloud - gitweb
LU-10973 lnet: LUTF infrastructure updates
[fs/lustre-release.git] / lustre / tests / lutf / src / liblutf_agent.c
1 #include <sys/socket.h>
2 #include <sys/time.h>
3 #include <netinet/in.h>
4 #include <arpa/inet.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <unistd.h>
9 #include <assert.h>
10 #include "cyaml.h"
11 #include "lutf_agent.h"
12 #include "lutf.h"
13 #include "lutf_python.h"
14 #include "lutf_listener.h"
15
16 static pthread_mutex_t agent_array_mutex;
17 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
18 static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS];
19 /* TODO: this is probably not thread safe */
20 static char agent_state_str[128];
21
22 static bool g_agent_enable_hb = true;
23 static struct in_addr g_local_ip;
24
25 #define DEFAULT_RPC_RSP "rpc:\n   src: %s\n   dst: %s\n   type: failure\n"
26
27 #define MUTEX_LOCK(x) \
28   pthread_mutex_lock(x)
29
30 #define MUTEX_UNLOCK(x) \
31   pthread_mutex_unlock(x)
32
33 char *get_local_ip()
34 {
35         return inet_ntoa(g_local_ip);
36 }
37
38 static void insert_dead_agent_locked(lutf_agent_blk_t *agent)
39 {
40         int i = 0;
41
42         for (i = 0; i < MAX_NUM_AGENTS; i++) {
43                 if (agent_dead_list[i] == NULL) {
44                         agent->state |= LUTF_AGENT_STATE_DEAD;
45                         agent_dead_list[i] = agent;
46                         agent->id = i;
47                         break;
48                 }
49         }
50         assert(i < MAX_NUM_AGENTS);
51 }
52
53 static void del_dead_agent_locked(lutf_agent_blk_t *agent)
54 {
55         assert(agent &&
56                agent->state & LUTF_AGENT_STATE_DEAD &&
57                agent_dead_list[agent->id] != NULL &&
58                agent_dead_list[agent->id] == agent);
59
60         assert(agent->ref_count > 0);
61         agent->ref_count--;
62
63         if (agent->ref_count == 0) {
64                 agent_dead_list[agent->id] = NULL;
65                 memset(agent, 0xdeadbeef, sizeof(*agent));
66                 free(agent);
67         }
68 }
69
70 void release_dead_list_agents(void)
71 {
72         int i;
73
74         MUTEX_LOCK(&agent_array_mutex);
75         for (i = 0; i < MAX_NUM_AGENTS; i++) {
76                 lutf_agent_blk_t *agent;
77
78                 agent = agent_dead_list[i];
79
80                 if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) {
81                         agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN;
82                         del_dead_agent_locked(agent);
83                 }
84         }
85         MUTEX_UNLOCK(&agent_array_mutex);
86 }
87
88 static inline bool agent_alive(lutf_agent_blk_t *agent)
89 {
90         bool viable = false;
91
92         MUTEX_LOCK(&agent->mutex);
93         if (agent->state & LUTF_AGENT_STATE_ALIVE)
94                 viable = true;
95         MUTEX_UNLOCK(&agent->mutex);
96
97         return viable;
98 }
99
100 void release_agent_blk(lutf_agent_blk_t *agent, int dead)
101 {
102         assert(agent);
103
104         MUTEX_LOCK(&agent_array_mutex);
105         MUTEX_LOCK(&agent->mutex);
106
107         if (agent->state & LUTF_AGENT_STATE_ALIVE) {
108                 /* sanity check */
109                 assert(agent_live_list[agent->id] != NULL &&
110                        agent_live_list[agent->id] == agent);
111         } else {
112                 MUTEX_UNLOCK(&agent->mutex);
113                 del_dead_agent_locked(agent);
114                 MUTEX_UNLOCK(&agent_array_mutex);
115                 return;
116         }
117
118         assert(agent->ref_count > 0);
119         agent->ref_count--;
120
121         if (agent->ref_count == 0) {
122                 agent_live_list[agent->id] = NULL;
123                 assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS));
124                 MUTEX_UNLOCK(&agent->mutex);
125                 MUTEX_UNLOCK(&agent_array_mutex);
126                 close_agent_connection(agent);
127                 memset(agent, 0xdeadbeef, sizeof(*agent));
128                 /* free the block */
129                 free(agent);
130         } else if (dead) {
131                 agent_live_list[agent->id] = NULL;
132                 insert_dead_agent_locked(agent);
133                 MUTEX_UNLOCK(&agent->mutex);
134                 MUTEX_UNLOCK(&agent_array_mutex);
135                 unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
136                 unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED);
137                 unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED);
138                 close_agent_connection(agent);
139         } else {
140                 MUTEX_UNLOCK(&agent->mutex);
141                 MUTEX_UNLOCK(&agent_array_mutex);
142         }
143 }
144
145 void acquire_agent_blk(lutf_agent_blk_t *agent)
146 {
147         /* acquire the agent blk mutex */
148         MUTEX_LOCK(&agent->mutex);
149         if (agent)
150                 agent->ref_count++;
151         MUTEX_UNLOCK(&agent->mutex);
152 }
153
154 char *agent_state2str(lutf_agent_blk_t *agent)
155 {
156         if (!agent)
157                 return "NULL PARAMETER";
158
159         sprintf(agent_state_str, "%s%s%s%s",
160                 (agent->state & LUTF_AGENT_STATE_ALIVE) ? "alive " : "dead ",
161                 (agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED) ? " HB" : "",
162                 (agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED) ? " RPC" : "",
163                 (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) ? " WIP" : "");
164
165         return agent_state_str;
166 }
167
168 static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr)
169 {
170         int i;
171         lutf_agent_blk_t *agent;
172
173         if (!addr)
174                 return NULL;
175
176         MUTEX_LOCK(&agent_array_mutex);
177         for (i = 0; i < MAX_NUM_AGENTS; i++) {
178                 agent = agent_live_list[i];
179                 if ((agent) && agent_alive(agent) &&
180                     (agent->addr.sin_addr.s_addr ==
181                      addr->sin_addr.s_addr)) {
182                         acquire_agent_blk(agent);
183                         MUTEX_UNLOCK(&agent_array_mutex);
184                         return agent;
185                 }
186         }
187         MUTEX_UNLOCK(&agent_array_mutex);
188
189         return NULL;
190 }
191
192 int get_next_active_agent(int idx, lutf_agent_blk_t **out)
193 {
194         int i = idx;
195         lutf_agent_blk_t *agent = NULL;
196
197         if (idx >= MAX_NUM_AGENTS)
198                 goto out;
199
200         MUTEX_LOCK(&agent_array_mutex);
201         for (i = idx; i < MAX_NUM_AGENTS; i++) {
202                 agent = agent_live_list[i];
203                 if (agent && agent_alive(agent)) {
204                         i++;
205                         acquire_agent_blk(agent);
206                         break;
207                 }
208         }
209         MUTEX_UNLOCK(&agent_array_mutex);
210
211 out:
212         *out = agent;
213
214         return i;
215 }
216
217 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
218 {
219         lutf_agent_blk_t *agent;
220
221         agent = find_agent_blk_by_addr(addr);
222         if (!agent)
223                 return find_free_agent_blk(addr);
224         release_agent_blk(agent, false);
225
226         return agent;
227 }
228
229 int lutf_agent_get_highest_fd(void)
230 {
231         lutf_agent_blk_t *agent;
232         int iMaxFd = INVALID_TCP_SOCKET;
233         int i;
234
235         MUTEX_LOCK(&agent_array_mutex);
236         for (i = 0; i < MAX_NUM_AGENTS; i++) {
237                 agent = agent_live_list[i];
238                 if (agent) {
239                         if (agent->iFileDesc > iMaxFd)
240                                 iMaxFd = agent->iFileDesc;
241                         if (agent->iRpcFd > iMaxFd)
242                                 iMaxFd = agent->iRpcFd;
243                 }
244         }
245         MUTEX_UNLOCK(&agent_array_mutex);
246
247         return iMaxFd;
248 }
249
250 void agent_disable_hb(void)
251 {
252         g_agent_enable_hb = false;
253 }
254
255 void agent_enable_hb(void)
256 {
257         g_agent_enable_hb = true;
258 }
259
260 int agent_get_hb(void)
261 {
262         return g_agent_enable_hb;
263 }
264
265 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
266 {
267         int i = 0;
268         lutf_agent_blk_t *agent;
269
270         /* grab the lock for the array */
271         MUTEX_LOCK(&agent_array_mutex);
272
273         /* iterate through the array to find a free entry */
274         for (i = 0; i < MAX_NUM_AGENTS; i++) {
275                 if (agent_live_list[i] == NULL)
276                         break;
277         }
278
279         if (i >= MAX_NUM_AGENTS) {
280                 MUTEX_UNLOCK(&agent_array_mutex);
281                 return NULL;
282         }
283
284         /* allocate a new agent blk and assign it to that entry */
285         agent = calloc(sizeof(char),
286                 sizeof(lutf_agent_blk_t));
287         if (!agent) {
288                 MUTEX_UNLOCK(&agent_array_mutex);
289                 return NULL;
290         }
291
292         gettimeofday(&agent->time_stamp, NULL);
293         agent->id = i;
294         agent->iFileDesc = INVALID_TCP_SOCKET;
295         agent->iRpcFd = INVALID_TCP_SOCKET;
296         agent->addr = *addr;
297         set_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
298
299         pthread_mutex_init(&agent->mutex, NULL);
300         acquire_agent_blk(agent);
301
302         /* assign to array */
303         agent_live_list[i] = agent;
304
305         /* release the array mutex */
306         MUTEX_UNLOCK(&agent_array_mutex);
307
308         /* return the agent blk */
309         return agent;
310 }
311
312 lutf_agent_blk_t *find_agent_blk_by_id(int idx)
313 {
314         lutf_agent_blk_t *agent;
315
316         if ((idx < 0) || (idx >= MAX_NUM_AGENTS))
317                 return NULL;
318
319         /* grab the array mutex */
320         MUTEX_LOCK(&agent_array_mutex);
321
322         /* if the blk is non null grab the mutex.
323          * possibly block until previous user is done
324          */
325         if (agent_live_list[idx] == NULL) {
326                 MUTEX_UNLOCK(&agent_array_mutex);
327                 return NULL;
328         }
329
330         agent = agent_live_list[idx];
331
332         if (agent_alive(agent))
333                 acquire_agent_blk(agent);
334         else
335                 agent = NULL;
336
337         /* release the array mutex */
338         MUTEX_UNLOCK(&agent_array_mutex);
339
340         /* return the agent blk */
341         return agent;
342 }
343
344 void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
345 {
346         MUTEX_LOCK(&agent->mutex);
347         agent->state |= state;
348         MUTEX_UNLOCK(&agent->mutex);
349 }
350
351 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
352 {
353         MUTEX_LOCK(&agent->mutex);
354         agent->state &= ~state;
355         MUTEX_UNLOCK(&agent->mutex);
356 }
357
358 char *agent_ip2str(lutf_agent_blk_t *agent)
359 {
360         if (!agent)
361                 return NULL;
362
363         return inet_ntoa(agent->addr.sin_addr);
364 }
365
366 int get_num_agents(void)
367 {
368         int i;
369         int num = 0;
370
371         for (i = 0; i < MAX_NUM_AGENTS; i++) {
372                 if (agent_live_list[i] != NULL)
373                         num++;
374         }
375
376         return num;
377 }
378
379 lutf_agent_blk_t *find_agent_blk_by_name(char *name)
380 {
381         lutf_agent_blk_t *agent;
382         int i;
383
384         if (!name)
385                 return NULL;
386
387         MUTEX_LOCK(&agent_array_mutex);
388
389         for (i = 0; i < MAX_NUM_AGENTS; i++) {
390                 agent = agent_live_list[i];
391                 if ((agent) && agent_alive(agent) &&
392                     ((strcmp(agent->name, name) == 0) ||
393                      (strcmp(name, TEST_ROLE_GRC) == 0))) {
394                         acquire_agent_blk(agent);
395                         break;
396                 } else {
397                         agent = NULL;
398                 }
399         }
400
401         MUTEX_UNLOCK(&agent_array_mutex);
402
403         /* return the agent blk */
404         return agent;
405 }
406
407 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
408 {
409         lutf_agent_blk_t *agent;
410         int i;
411         struct sockaddr_in addr;
412
413         if (!ip)
414                 return NULL;
415
416         inet_aton(ip, &addr.sin_addr);
417
418         /* grab the array mutex */
419         MUTEX_LOCK(&agent_array_mutex);
420
421         for (i = 0; i < MAX_NUM_AGENTS; i++) {
422                 agent = agent_live_list[i];
423                 if ((agent) && agent_alive(agent) &&
424                     (agent->addr.sin_addr.s_addr ==
425                                 addr.sin_addr.s_addr))
426                         break;
427                 else
428                         agent = NULL;
429         }
430
431         if (agent)
432                 acquire_agent_blk(agent);
433
434         /* release the array mutex */
435         MUTEX_UNLOCK(&agent_array_mutex);
436
437         /* return the agent blk */
438         return agent;
439 }
440
441 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
442 {
443         struct timeval start;
444         struct timeval now;
445         struct cYAML *a;
446         bool found = false;
447         lutf_agent_blk_t *agent;
448
449         gettimeofday(&start, NULL);
450         gettimeofday(&now, NULL);
451
452         if (!agents) {
453                 PDEBUG("No agent to wait for");
454                 return EN_LUTF_RC_OK;
455         }
456
457         PDEBUG("Start waiting for Agents");
458
459         while (now.tv_sec - start.tv_sec < timeout && !found) {
460                 found = true;
461                 PDEBUG("Waiting for Agents");
462                 while (cYAML_get_next_seq_item(agents, &a) != NULL) {
463                         PDEBUG("Looking up: %s", a->cy_valuestring);
464                         agent = find_agent_blk_by_name(a->cy_valuestring);
465                         if (agent) {
466                                 PDEBUG("agent %s found\n", agent->name);
467                                 release_agent_blk(agent, false);
468                         } else {
469                                 found = false;
470                                 break;
471                         }
472                 }
473                 if (!found)
474                         sleep(1);
475                 gettimeofday(&now, NULL);
476         }
477
478         return found ? EN_LUTF_RC_OK : EN_LUTF_RC_TIMEOUT;
479 }
480
481 int get_num_agents_remote(char *masterIP, int masterPort)
482 {
483         lutf_rc_t rc;
484         lutf_msg_num_agents_query_t msg;
485         lutf_msg_num_agents_query_t *msg_p;
486         lutf_message_hdr_t hdr;
487         lutf_message_hdr_t *hdr_p;
488         int remoteSocket = INVALID_TCP_SOCKET;
489         struct in_addr addr;
490         char *recvBuf = calloc(1, sizeof(hdr) + sizeof(hdr));
491
492         if (!recvBuf) {
493                 PERROR("out of memory");
494                 rc = EN_LUTF_RC_FAIL;
495                 goto out;
496         }
497
498         if (!inet_aton(masterIP, &addr)) {
499                 PERROR("bad master IP = %s", masterIP);
500                 rc = EN_LUTF_RC_FAIL;
501                 goto out;
502         }
503
504         /* in network byte order, convert so we can have a
505          * uniform API
506          */
507         remoteSocket = establishTCPConnection(addr.s_addr,
508                                                 htons(masterPort),
509                                                 false, false);
510         if (remoteSocket < 0) {
511                 PERROR("establishTCPConnection failure: %s",
512                        lutf_rc2str(remoteSocket));
513                 rc = remoteSocket;
514                 goto out;
515         }
516
517         rc = lutf_send_msg(remoteSocket, NULL, 0, EN_MSG_TYPE_GET_NUM_AGENTS);
518         if (rc)
519                 goto out;
520
521         rc = readTcpMessage(remoteSocket, recvBuf, sizeof(hdr) + sizeof(msg),
522                             TCP_READ_TIMEOUT_SEC);
523         if (rc) {
524                 PERROR("failed to receive response");
525                 goto out;
526         }
527
528         hdr_p = (lutf_message_hdr_t *)recvBuf;
529         msg_p = (lutf_msg_num_agents_query_t *)(recvBuf + sizeof(hdr));
530
531         if (hdr_p->type != EN_MSG_TYPE_GET_NUM_AGENTS) {
532                 PERROR("Unexpected message. Waiting for num agents received %d",
533                        hdr_p->type);
534                 rc = EN_LUTF_RC_FAIL;
535                 goto out;
536         }
537
538         rc = msg_p->num_agents;
539
540 out:
541         closeTcpConnection(remoteSocket);
542         free(recvBuf);
543         return rc;
544 }
545
546 lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
547 {
548         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
549         lutf_agent_blk_t *agent_blk;
550         char *default_rsp;
551         lutf_message_hdr_t hdr;
552         char *recvBuf = NULL;
553         int msg_size;
554
555         if (!agent || !yaml || !rsp)
556                 goto fail_rpc;
557
558         msg_size = strlen(yaml) + 1;
559
560         PDEBUG("sending rpc request\n%s", yaml);
561
562         agent_blk = find_agent_blk_by_name(agent);
563         if (!agent_blk) {
564                 PERROR("Can't find agent with name: %s", agent);
565                 goto fail_rpc_no_agent;
566         }
567
568         MUTEX_LOCK(&agent_blk->mutex);
569         if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
570                 MUTEX_UNLOCK(&agent_blk->mutex);
571                 PDEBUG("Establishing an RPC channel to agent %s:%s:%d",
572                        agent_blk->name,
573                        inet_ntoa(agent_blk->addr.sin_addr),
574                        agent_blk->listen_port);
575                 /* in network byte order, convert so we can have a
576                  * uniform API
577                  */
578                 agent_blk->iRpcFd = establishTCPConnection(
579                                 agent_blk->addr.sin_addr.s_addr,
580                                 htons(agent_blk->listen_port),
581                                 false, false);
582                 if (agent_blk->iRpcFd < 0)
583                         goto fail_rpc;
584                 set_agent_state(agent_blk,
585                                 LUTF_AGENT_RPC_CHANNEL_CONNECTED);
586         } else {
587                 MUTEX_UNLOCK(&agent_blk->mutex);
588         }
589
590         set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
591
592         rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
593                            EN_MSG_TYPE_RPC_REQUEST);
594         if (rc != EN_LUTF_RC_OK) {
595                 PERROR("Failed to send rpc message: %s", yaml);
596                 goto fail_rpc;
597         }
598
599         /* wait for the response */
600         rc = readTcpMessage(agent_blk->iRpcFd, (char *)&hdr,
601                             sizeof(hdr), timeout);
602         if (rc != EN_LUTF_RC_OK) {
603                 PERROR("Failed to recv rpc header in timeout %d",
604                        timeout);
605                 goto fail_rpc;
606         }
607
608         if (ntohl(hdr.type) != EN_MSG_TYPE_RPC_RESPONSE ||
609             ntohl(hdr.version) != LUTF_VERSION_NUMBER) {
610                 PERROR("Bad response. version %d, type:%d\n",
611                        hdr.type, hdr.version);
612                 goto fail_rpc;
613         }
614
615         recvBuf = calloc(ntohl(hdr.len), 1);
616         if (!recvBuf) {
617                 PERROR("Failed to allocate buffer to recv rpc response");
618                 goto fail_rpc;
619         }
620
621         rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len),
622                             timeout);
623         if (rc != EN_LUTF_RC_OK) {
624                 PERROR("Failed to recv rpc body in timeout %d", timeout);
625                 goto fail_rpc;
626         }
627
628         /*
629          * once recvBuf is given back to the caller, it's expected that
630          * the caller will manage the memory and free when done. This is
631          * mainly called from python. The SWIG wrapper frees the memory
632          * appropriately.
633          */
634         *rsp = recvBuf;
635         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
636         release_agent_blk(agent_blk, false);
637
638         return EN_LUTF_RC_OK;
639
640 fail_rpc:
641         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
642         set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
643         release_agent_blk(agent_blk, true);
644         if (recvBuf)
645                 free(recvBuf);
646         msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
647                 strlen(g_lutf_cfg.l_info.hb_info.node_name) + 1;
648 fail_rpc_no_agent:
649         default_rsp = calloc(msg_size, 1);
650         if (!default_rsp) {
651                 PERROR("Failed to allocate buffer for default response");
652                 *rsp = NULL;
653         } else {
654                 /* the source for the response would be the agent we sent
655                  * to and the destination is me
656                  */
657                 snprintf(default_rsp, msg_size,
658                          DEFAULT_RPC_RSP, agent,
659                          g_lutf_cfg.l_info.hb_info.node_name);
660                 *rsp = default_rsp;
661         }
662
663         return rc;
664 }
665
666 lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
667 {
668         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
669         lutf_agent_blk_t *agent_blk;
670         int msg_size;
671         bool dead = false;
672
673         if (!agent || !yaml)
674                 goto out;
675
676         msg_size = strlen(yaml) + 1;
677
678         agent_blk = find_agent_blk_by_name(agent);
679         if (!agent_blk) {
680                 PERROR("Can't find agent with name: %s", agent);
681                 goto out;
682         }
683
684         MUTEX_LOCK(&agent_blk->mutex);
685         if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
686                 MUTEX_UNLOCK(&agent_blk->mutex);
687                 PERROR("agent_blk %s doesn't have an RPC channel",
688                        agent_blk->name);
689                 goto release_agent;
690         }
691         MUTEX_UNLOCK(&agent_blk->mutex);
692
693         set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
694         PDEBUG("sending rpc response\n%s", yaml);
695         rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
696                            EN_MSG_TYPE_RPC_RESPONSE);
697         if (rc)
698                 dead = true;
699 release_agent:
700         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
701         release_agent_blk(agent_blk, dead);
702         if (dead)
703                 set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
704 out:
705         return rc;
706 }
707
708 void agent_init(void)
709 {
710         pthread_mutex_init(&agent_array_mutex, NULL);
711 }