Whamcloud - gitweb
970c6c69f382223e51ac65f29aba75eb99e5c501
[fs/lustre-release.git] / lustre / tests / lutf / src / liblutf_agent.c
1 #include <sys/socket.h>
2 #include <sys/time.h>
3 #include <netinet/in.h>
4 #include <arpa/inet.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <unistd.h>
9 #include <assert.h>
10 #include "cyaml.h"
11 #include "lutf_agent.h"
12 #include "lutf.h"
13 #include "lutf_python.h"
14
15 static pthread_mutex_t agent_array_mutex;
16 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
17 /* TODO: this is probably not thread safe */
18 static char agent_state_str[128];
19
20 extern bool g_agent_enable_hb;
21 extern struct in_addr g_local_ip;
22
23 #define DEFAULT_RPC_RSP "rpc:\n   src: %s\n   dst: %s\n   type: failure\n"
24
25 #define MUTEX_LOCK(x) \
26   pthread_mutex_lock(x)
27
28 #define MUTEX_UNLOCK(x) \
29   pthread_mutex_unlock(x)
30
31 char *get_local_ip()
32 {
33         return inet_ntoa(g_local_ip);
34 }
35
36 void release_agent_blk(lutf_agent_blk_t *agent)
37 {
38         /* release the agent blk mutex */
39         MUTEX_LOCK(&agent->mutex);
40         if (agent) {
41                 assert(agent->ref_count != 0);
42                 agent->ref_count--;
43         }
44         MUTEX_UNLOCK(&agent->mutex);
45 }
46
47 void acquire_agent_blk(lutf_agent_blk_t *agent)
48 {
49         /* acquire the agent blk mutex */
50         MUTEX_LOCK(&agent->mutex);
51         if (agent)
52                 agent->ref_count++;
53         MUTEX_UNLOCK(&agent->mutex);
54 }
55
56 char *agent_state2str(lutf_agent_blk_t *agent)
57 {
58         if (!agent)
59                 return "NULL PARAMETER";
60
61         sprintf(agent_state_str, "%s%s%s%s",
62                 (agent->state & LUTF_AGENT_STATE_ALIVE) ? "alive " : "dead ",
63                 (agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED) ? " HB" : "",
64                 (agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED) ? " RPC" : "",
65                 (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) ? " WIP" : "");
66
67         return agent_state_str;
68 }
69
70 static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
71                                                 struct sockaddr_in *addr)
72 {
73         int i;
74         lutf_agent_blk_t *agent;
75
76         if (!addr)
77                 return NULL;
78
79         MUTEX_LOCK(&agent_array_mutex);
80         for (i = 0; i < MAX_NUM_AGENTS; i++) {
81                 agent = list[i];
82                 if ((agent) &&
83                     (agent->addr.sin_addr.s_addr ==
84                      addr->sin_addr.s_addr)) {
85                         MUTEX_UNLOCK(&agent_array_mutex);
86                         return agent;
87                 }
88         }
89         MUTEX_UNLOCK(&agent_array_mutex);
90
91         return NULL;
92 }
93
94 int get_next_active_agent(int idx, lutf_agent_blk_t **out)
95 {
96         int i = idx;
97         lutf_agent_blk_t *agent = NULL;
98
99         if (idx >= MAX_NUM_AGENTS)
100                 goto out;
101
102         MUTEX_LOCK(&agent_array_mutex);
103         for (i = idx; i < MAX_NUM_AGENTS; i++) {
104                 agent = agent_live_list[i];
105                 if (agent) {
106                         i++;
107                         acquire_agent_blk(agent);
108                         break;
109                 }
110         }
111         MUTEX_UNLOCK(&agent_array_mutex);
112
113 out:
114         *out = agent;
115
116         return i;
117 }
118
119 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
120 {
121         lutf_agent_blk_t *agent;
122         agent = find_agent_blk_by_addr(agent_live_list, addr);
123         if (!agent)
124                 return find_free_agent_blk(addr);
125
126         MUTEX_LOCK(&agent_array_mutex);
127         acquire_agent_blk(agent);
128         MUTEX_UNLOCK(&agent_array_mutex);
129
130         return agent;
131 }
132
133 int lutf_agent_get_highest_fd(void)
134 {
135         lutf_agent_blk_t *agent;
136         int iMaxFd = INVALID_TCP_SOCKET;
137         int i;
138
139         MUTEX_LOCK(&agent_array_mutex);
140         for (i = 0; i < MAX_NUM_AGENTS; i++) {
141                 agent = agent_live_list[i];
142                 if (agent) {
143                         if (agent->iFileDesc > iMaxFd)
144                                 iMaxFd = agent->iFileDesc;
145                         if (agent->iRpcFd > iMaxFd)
146                                 iMaxFd = agent->iRpcFd;
147                 }
148         }
149         MUTEX_UNLOCK(&agent_array_mutex);
150
151         return iMaxFd;
152 }
153
154 void agent_disable_hb(void)
155 {
156         g_agent_enable_hb = false;
157 }
158
159 void agent_enable_hb(void)
160 {
161         g_agent_enable_hb = true;
162 }
163
164 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
165 {
166         int i = 0;
167         lutf_agent_blk_t *agent;
168
169         /* grab the lock for the array */
170         MUTEX_LOCK(&agent_array_mutex);
171
172         /* iterate through the array to find a free entry */
173         while (agent_live_list[i] != NULL)
174                 i++;
175
176         if (i >= MAX_NUM_AGENTS) {
177                 MUTEX_UNLOCK(&agent_array_mutex);
178                 return NULL;
179         }
180
181         /* allocate a new agent blk and assign it to that entry */
182         agent = calloc(sizeof(char),
183                 sizeof(lutf_agent_blk_t));
184         if (!agent) {
185                 MUTEX_UNLOCK(&agent_array_mutex);
186                 return NULL;
187         }
188
189         gettimeofday(&agent->time_stamp, NULL);
190         agent->id = i;
191         agent->iFileDesc = INVALID_TCP_SOCKET;
192         agent->iRpcFd = INVALID_TCP_SOCKET;
193         agent->addr = *addr;
194         set_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
195
196         pthread_mutex_init(&agent->mutex, NULL);
197         acquire_agent_blk(agent);
198
199         /* assign to array */
200         agent_live_list[i] = agent;
201
202         /* release the array mutex */
203         MUTEX_UNLOCK(&agent_array_mutex);
204
205         /* return the agent blk */
206         return agent;
207 }
208
209 lutf_agent_blk_t *find_agent_blk_by_id(int idx)
210 {
211         lutf_agent_blk_t *agent;
212
213         if ((idx < 0) || (idx >= MAX_NUM_AGENTS))
214                 return NULL;
215
216         /* grab the array mutex */
217         MUTEX_LOCK(&agent_array_mutex);
218
219         /* if the blk is non null grab the mutex.
220          * possibly block until previous user is done
221          */
222         if (agent_live_list[idx] == NULL) {
223                 MUTEX_UNLOCK(&agent_array_mutex);
224                 return NULL;
225         }
226
227         agent = agent_live_list[idx];
228
229         acquire_agent_blk(agent);
230
231         /* release the array mutex */
232         MUTEX_UNLOCK(&agent_array_mutex);
233
234         /* return the agent blk */
235         return agent;
236 }
237
238 void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
239 {
240         MUTEX_LOCK(&agent->mutex);
241         agent->state |= state;
242         MUTEX_UNLOCK(&agent->mutex);
243 }
244
245 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
246 {
247         bool zombie = false;
248
249         MUTEX_LOCK(&agent->mutex);
250         agent->state &= ~state;
251         if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) &&
252             (agent->state & LUTF_AGENT_ZOMBIE))
253                 zombie = true;
254         MUTEX_UNLOCK(&agent->mutex);
255
256         if (zombie)
257                 free_agent_blk(agent->id);
258 }
259
260 void free_agent_blk(int id)
261 {
262         lutf_agent_blk_t *agent;
263
264         /* grab the array mutex */
265         MUTEX_LOCK(&agent_array_mutex);
266
267         /* if the blk is non null grab the mutex.
268          * possibly block until previous user is done
269          */
270         if (agent_live_list[id] == NULL) {
271                 MUTEX_UNLOCK(&agent_array_mutex);
272                 return;
273         }
274
275         agent = agent_live_list[id];
276
277         MUTEX_LOCK(&agent->mutex);
278         if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) {
279                 MUTEX_UNLOCK(&agent->mutex);
280                 MUTEX_UNLOCK(&agent_array_mutex);
281                 PDEBUG("delay deleting agent %s\n", agent->name);
282                 set_agent_state(agent, LUTF_AGENT_ZOMBIE);
283                 return;
284         }
285         MUTEX_UNLOCK(&agent->mutex);
286
287         agent_live_list[id] = NULL;
288
289         /* release the array mutex */
290         MUTEX_UNLOCK(&agent_array_mutex);
291
292         /* free the block */
293         free(agent);
294 }
295
296 char *agent_ip2str(lutf_agent_blk_t *agent)
297 {
298         if (!agent)
299                 return NULL;
300
301         return inet_ntoa(agent->addr.sin_addr);
302 }
303
304 int get_num_agents(void)
305 {
306         int i;
307         int num = 0;
308         for (i = 0; i < MAX_NUM_AGENTS; i++) {
309                 if (agent_live_list[i] != NULL)
310                         num++;
311         }
312
313         return num;
314 }
315
316 /* no lock version of the function */
317 static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
318 {
319         lutf_agent_blk_t *agent;
320         int i;
321
322         if (!name)
323                 return NULL;
324
325         MUTEX_LOCK(&agent_array_mutex);
326
327         for (i = 0; i < MAX_NUM_AGENTS; i++) {
328                 agent = agent_live_list[i];
329                 if ((agent) &&
330                     ((strcmp(agent->name, name) == 0) ||
331                      (strcmp(name, TEST_ROLE_GRC) == 0))) {
332                         break;
333                 }
334         }
335
336         MUTEX_UNLOCK(&agent_array_mutex);
337
338         /* return the agent blk */
339         return agent;
340 }
341
342 lutf_agent_blk_t *find_agent_blk_by_name(char *name)
343 {
344         lutf_agent_blk_t *agent;
345
346         agent = find_agent_blk_by_name_nl(name);
347         if (agent)
348                 acquire_agent_blk(agent);
349
350         /* return the agent blk */
351         return agent;
352 }
353
354 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
355 {
356         lutf_agent_blk_t *agent;
357         int i;
358         struct sockaddr_in addr;
359
360         if (!ip)
361                 return NULL;
362
363         inet_aton(ip, &addr.sin_addr);
364
365         /* grab the array mutex */
366         MUTEX_LOCK(&agent_array_mutex);
367
368         for (i = 0; i < MAX_NUM_AGENTS; i++) {
369                 agent = agent_live_list[i];
370                 if ((agent) && (agent->addr.sin_addr.s_addr ==
371                                 addr.sin_addr.s_addr))
372                         break;
373         }
374
375         if (agent)
376                 acquire_agent_blk(agent);
377
378         /* release the array mutex */
379         MUTEX_UNLOCK(&agent_array_mutex);
380
381         /* return the agent blk */
382         return agent;
383 }
384
385 void agent_hb_check(struct timeval *t, lutf_type_t me)
386 {
387         lutf_agent_blk_t *agent;
388         int i;
389
390         /* grab the array mutex */
391         MUTEX_LOCK(&agent_array_mutex);
392
393         for (i = 0; i < MAX_NUM_AGENTS; i++) {
394                 agent = agent_live_list[i];
395                 if (agent && agent->node_type != me) {
396                         acquire_agent_blk(agent);
397                         if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
398                                 int agent_id = agent->id;
399                                 /* agent didn't send a HB move to dead
400                                  * list
401                                  */
402                                 PERROR("agent %s presumed dead", agent->name);
403                                 release_agent_blk(agent);
404                                 MUTEX_UNLOCK(&agent_array_mutex);
405                                 /* free_agent_blk() grabs the mutex */
406                                 free_agent_blk(agent_id);
407                                 MUTEX_LOCK(&agent_array_mutex);
408                                 continue;
409                         }
410                         release_agent_blk(agent);
411                 }
412         }
413
414         /* release the array mutex */
415         MUTEX_UNLOCK(&agent_array_mutex);
416 }
417
418 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
419 {
420         struct timeval start;
421         struct timeval now;
422         struct cYAML *a;
423         bool found = false;
424         lutf_agent_blk_t *agent;
425
426         gettimeofday(&start, NULL);
427         gettimeofday(&now, NULL);
428
429         if (!agents) {
430                 PDEBUG("No agent to wait for");
431                 return EN_LUTF_RC_OK;
432         }
433
434         PDEBUG("Start waiting for Agents");
435
436         while (now.tv_sec - start.tv_sec < timeout && !found) {
437                 found = true;
438                 PDEBUG("Waiting for Agents");
439                 while (cYAML_get_next_seq_item(agents, &a) != NULL) {
440                         PDEBUG("Looking up: %s", a->cy_valuestring);
441                         if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) {
442                                 found = false;
443                                 break;
444                         } else {
445                                 PDEBUG("agent %s found\n", agent->name);
446                                 release_agent_blk(agent);
447                         }
448                 }
449                 if (!found)
450                         sleep(1);
451                 gettimeofday(&now, NULL);
452         }
453
454         return found ? EN_LUTF_RC_OK : EN_LUTF_RC_TIMEOUT;
455 }
456
457 int get_num_agents_remote(char *masterIP, int masterPort)
458 {
459         lutf_rc_t rc;
460         lutf_msg_num_agents_query_t msg;
461         lutf_msg_num_agents_query_t *msg_p;
462         lutf_message_hdr_t hdr;
463         lutf_message_hdr_t *hdr_p;
464         int remoteSocket = INVALID_TCP_SOCKET;
465         struct in_addr addr;
466         char *recvBuf = calloc(1, sizeof(hdr) + sizeof(hdr));
467
468         if (!recvBuf) {
469                 PERROR("out of memory");
470                 rc = EN_LUTF_RC_FAIL;
471                 goto out;
472         }
473
474         if (!inet_aton(masterIP, &addr)) {
475                 PERROR("bad master IP = %s", masterIP);
476                 rc = EN_LUTF_RC_FAIL;
477                 goto out;
478         }
479
480         /* in network byte order, convert so we can have a
481          * uniform API
482          */
483         remoteSocket = establishTCPConnection(addr.s_addr,
484                                                 htons(masterPort),
485                                                 false, false);
486         if (remoteSocket < 0) {
487                 PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket));
488                 rc = remoteSocket;
489                 goto out;
490         }
491
492         rc = lutf_send_msg(remoteSocket, NULL, 0, EN_MSG_TYPE_GET_NUM_AGENTS);
493         if (rc)
494                 goto out;
495
496         rc = readTcpMessage(remoteSocket, recvBuf, sizeof(hdr) + sizeof(msg),
497                             TCP_READ_TIMEOUT_SEC);
498         if (rc) {
499                 PERROR("failed to receive response");
500                 goto out;
501         }
502
503         hdr_p = (lutf_message_hdr_t *)recvBuf;
504         msg_p = (lutf_msg_num_agents_query_t *)(recvBuf + sizeof(hdr));
505
506         if (hdr_p->type != EN_MSG_TYPE_GET_NUM_AGENTS) {
507                 PERROR("Unexpected message. Waiting for num agents received %d",
508                        hdr_p->type);
509                 rc = EN_LUTF_RC_FAIL;
510                 goto out;
511         }
512
513         rc = msg_p->num_agents;
514
515 out:
516         closeTcpConnection(remoteSocket);
517         free(recvBuf);
518         return rc;
519 }
520
521 lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
522 {
523         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
524         lutf_agent_blk_t *agent_blk;
525         char *default_rsp;
526         lutf_message_hdr_t hdr;
527         char *recvBuf = NULL;
528         int msg_size;
529
530         if (!agent || !yaml || !rsp)
531                 goto fail_rpc;
532
533         msg_size = strlen(yaml) + 1;
534
535         PDEBUG("sending rpc request\n%s", yaml);
536
537         agent_blk = find_agent_blk_by_name(agent);
538         if (!agent_blk) {
539                 PERROR("Can't find agent with name: %s", agent);
540                 goto fail_rpc_no_agent;
541         }
542
543         MUTEX_LOCK(&agent_blk->mutex);
544         if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
545                 MUTEX_UNLOCK(&agent_blk->mutex);
546                 PDEBUG("Establishing an RPC channel to agent %s:%s:%d",
547                        agent_blk->name,
548                        inet_ntoa(agent_blk->addr.sin_addr),
549                        agent_blk->listen_port);
550                 /* in network byte order, convert so we can have a uniform API */
551                 agent_blk->iRpcFd = establishTCPConnection(
552                                 agent_blk->addr.sin_addr.s_addr,
553                                 htons(agent_blk->listen_port),
554                                 false, false);
555                 if (agent_blk->iRpcFd < 0)
556                         goto fail_rpc;
557                 set_agent_state(agent_blk,
558                                 LUTF_AGENT_RPC_CHANNEL_CONNECTED);
559         } else {
560                 MUTEX_UNLOCK(&agent_blk->mutex);
561         }
562
563         set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
564
565         rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
566                            EN_MSG_TYPE_RPC_REQUEST);
567         if (rc != EN_LUTF_RC_OK) {
568                 PERROR("Failed to send rpc message: %s", yaml);
569                 goto fail_rpc;
570         }
571
572         /* wait for the response */
573         rc = readTcpMessage(agent_blk->iRpcFd, (char *)&hdr,
574                             sizeof(hdr), timeout);
575         if (rc != EN_LUTF_RC_OK) {
576                 PERROR("Failed to recv rpc header in timeout %d",
577                        timeout);
578                 goto fail_rpc;
579         }
580
581         if (ntohl(hdr.type) != EN_MSG_TYPE_RPC_RESPONSE ||
582             ntohl(hdr.version) != LUTF_VERSION_NUMBER) {
583                 PERROR("Bad response. version %d, type:%d\n",
584                        hdr.type, hdr.version);
585                 goto fail_rpc;
586         }
587
588         recvBuf = calloc(ntohl(hdr.len), 1);
589         if (!recvBuf) {
590                 PERROR("Failed to allocate buffer to recv rpc response");
591                 goto fail_rpc;
592         }
593
594         rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout);
595         if (rc != EN_LUTF_RC_OK) {
596                 PERROR("Failed to recv rpc body in timeout %d", timeout);
597                 goto fail_rpc;
598         }
599
600         /*
601          * once recvBuf is given back to the caller, it's expected that
602          * the caller will manage the memory and free when done. This is
603          * mainly called from python. The SWIG wrapper frees the memory
604          * appropriately.
605          */
606         *rsp = recvBuf;
607         release_agent_blk(agent_blk);
608
609         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
610
611         return EN_LUTF_RC_OK;
612
613 fail_rpc:
614         release_agent_blk(agent_blk);
615         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
616         if (recvBuf)
617                 free(recvBuf);
618         msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
619                 strlen(g_lutf_cfg.l_info.hb_info.node_name) + 1;
620 fail_rpc_no_agent:
621         default_rsp = calloc(msg_size, 1);
622         if (!default_rsp) {
623                 PERROR("Failed to allocate buffer for default response");
624                 *rsp = NULL;
625         } else {
626                 /* the source for the response would be the agent we sent
627                  * to and the destination is me
628                  */
629                 snprintf(default_rsp, msg_size,
630                          DEFAULT_RPC_RSP, agent,
631                          g_lutf_cfg.l_info.hb_info.node_name);
632                 *rsp = default_rsp;
633         }
634
635         return rc;
636 }
637
638 lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
639 {
640         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
641         lutf_agent_blk_t *agent_blk;
642         int msg_size;
643
644         if (!agent || !yaml)
645                 goto out;
646
647         msg_size = strlen(yaml) + 1;
648
649         agent_blk = find_agent_blk_by_name_nl(agent);
650         if (!agent_blk) {
651                 PERROR("Can't find agent with name: %s", agent);
652                 goto out;
653         }
654
655         MUTEX_LOCK(&agent_blk->mutex);
656         if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
657                 MUTEX_UNLOCK(&agent_blk->mutex);
658                 PERROR("agent_blk %s doesn't have an RPC channel",
659                        agent_blk->name);
660                 goto out;
661         }
662         MUTEX_UNLOCK(&agent_blk->mutex);
663
664         PDEBUG("sending rpc response\n%s", yaml);
665         rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
666                            EN_MSG_TYPE_RPC_RESPONSE);
667
668 out:
669         return rc;
670 }
671
672 void agent_init(void)
673 {
674         pthread_mutex_init(&agent_array_mutex, NULL);
675 }