1 #include <sys/socket.h>
3 #include <netinet/in.h>
11 #include "lutf_agent.h"
13 #include "lutf_python.h"
14 #include "lutf_listener.h"
16 static pthread_mutex_t agent_array_mutex;
17 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
18 static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS];
19 /* TODO: this is probably not thread safe */
20 static char agent_state_str[128];
22 static bool g_agent_enable_hb = true;
23 static struct in_addr g_local_ip;
25 #define DEFAULT_RPC_RSP "rpc:\n src: %s\n dst: %s\n type: failure\n"
27 #define MUTEX_LOCK(x) \
30 #define MUTEX_UNLOCK(x) \
31 pthread_mutex_unlock(x)
35 return inet_ntoa(g_local_ip);
38 static void insert_dead_agent_locked(lutf_agent_blk_t *agent)
42 for (i = 0; i < MAX_NUM_AGENTS; i++) {
43 if (agent_dead_list[i] == NULL) {
44 agent->state |= LUTF_AGENT_STATE_DEAD;
45 agent_dead_list[i] = agent;
50 assert(i < MAX_NUM_AGENTS);
53 static void del_dead_agent_locked(lutf_agent_blk_t *agent)
56 agent->state & LUTF_AGENT_STATE_DEAD &&
57 agent_dead_list[agent->id] != NULL &&
58 agent_dead_list[agent->id] == agent);
60 assert(agent->ref_count > 0);
63 if (agent->ref_count == 0) {
64 agent_dead_list[agent->id] = NULL;
65 memset(agent, 0xdeadbeef, sizeof(*agent));
70 void release_dead_list_agents(void)
74 MUTEX_LOCK(&agent_array_mutex);
75 for (i = 0; i < MAX_NUM_AGENTS; i++) {
76 lutf_agent_blk_t *agent;
78 agent = agent_dead_list[i];
80 if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) {
81 agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN;
82 del_dead_agent_locked(agent);
85 MUTEX_UNLOCK(&agent_array_mutex);
88 static inline bool agent_alive(lutf_agent_blk_t *agent)
92 MUTEX_LOCK(&agent->mutex);
93 if (agent->state & LUTF_AGENT_STATE_ALIVE)
95 MUTEX_UNLOCK(&agent->mutex);
100 void release_agent_blk(lutf_agent_blk_t *agent, int dead)
104 MUTEX_LOCK(&agent_array_mutex);
105 MUTEX_LOCK(&agent->mutex);
107 if (agent->state & LUTF_AGENT_STATE_ALIVE) {
109 assert(agent_live_list[agent->id] != NULL &&
110 agent_live_list[agent->id] == agent);
112 MUTEX_UNLOCK(&agent->mutex);
113 del_dead_agent_locked(agent);
114 MUTEX_UNLOCK(&agent_array_mutex);
118 assert(agent->ref_count > 0);
121 if (agent->ref_count == 0) {
122 agent_live_list[agent->id] = NULL;
123 assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS));
124 MUTEX_UNLOCK(&agent->mutex);
125 MUTEX_UNLOCK(&agent_array_mutex);
126 close_agent_connection(agent);
127 memset(agent, 0xdeadbeef, sizeof(*agent));
131 agent_live_list[agent->id] = NULL;
132 insert_dead_agent_locked(agent);
133 MUTEX_UNLOCK(&agent->mutex);
134 MUTEX_UNLOCK(&agent_array_mutex);
135 unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
136 unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED);
137 unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED);
138 close_agent_connection(agent);
140 MUTEX_UNLOCK(&agent->mutex);
141 MUTEX_UNLOCK(&agent_array_mutex);
145 void acquire_agent_blk(lutf_agent_blk_t *agent)
147 /* acquire the agent blk mutex */
148 MUTEX_LOCK(&agent->mutex);
151 MUTEX_UNLOCK(&agent->mutex);
154 char *agent_state2str(lutf_agent_blk_t *agent)
157 return "NULL PARAMETER";
159 sprintf(agent_state_str, "%s%s%s%s",
160 (agent->state & LUTF_AGENT_STATE_ALIVE) ? "alive " : "dead ",
161 (agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED) ? " HB" : "",
162 (agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED) ? " RPC" : "",
163 (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) ? " WIP" : "");
165 return agent_state_str;
168 static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr)
171 lutf_agent_blk_t *agent;
176 MUTEX_LOCK(&agent_array_mutex);
177 for (i = 0; i < MAX_NUM_AGENTS; i++) {
178 agent = agent_live_list[i];
179 if ((agent) && agent_alive(agent) &&
180 (agent->addr.sin_addr.s_addr ==
181 addr->sin_addr.s_addr)) {
182 acquire_agent_blk(agent);
183 MUTEX_UNLOCK(&agent_array_mutex);
187 MUTEX_UNLOCK(&agent_array_mutex);
192 int get_next_active_agent(int idx, lutf_agent_blk_t **out)
195 lutf_agent_blk_t *agent = NULL;
197 if (idx >= MAX_NUM_AGENTS)
200 MUTEX_LOCK(&agent_array_mutex);
201 for (i = idx; i < MAX_NUM_AGENTS; i++) {
202 agent = agent_live_list[i];
203 if (agent && agent_alive(agent)) {
205 acquire_agent_blk(agent);
209 MUTEX_UNLOCK(&agent_array_mutex);
217 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
219 lutf_agent_blk_t *agent;
221 agent = find_agent_blk_by_addr(addr);
223 return find_free_agent_blk(addr);
224 release_agent_blk(agent, false);
229 int lutf_agent_get_highest_fd(void)
231 lutf_agent_blk_t *agent;
232 int iMaxFd = INVALID_TCP_SOCKET;
235 MUTEX_LOCK(&agent_array_mutex);
236 for (i = 0; i < MAX_NUM_AGENTS; i++) {
237 agent = agent_live_list[i];
239 if (agent->iFileDesc > iMaxFd)
240 iMaxFd = agent->iFileDesc;
241 if (agent->iRpcFd > iMaxFd)
242 iMaxFd = agent->iRpcFd;
245 MUTEX_UNLOCK(&agent_array_mutex);
250 void agent_disable_hb(void)
252 g_agent_enable_hb = false;
255 void agent_enable_hb(void)
257 g_agent_enable_hb = true;
260 int agent_get_hb(void)
262 return g_agent_enable_hb;
265 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
268 lutf_agent_blk_t *agent;
270 /* grab the lock for the array */
271 MUTEX_LOCK(&agent_array_mutex);
273 /* iterate through the array to find a free entry */
274 for (i = 0; i < MAX_NUM_AGENTS; i++) {
275 if (agent_live_list[i] == NULL)
279 if (i >= MAX_NUM_AGENTS) {
280 MUTEX_UNLOCK(&agent_array_mutex);
284 /* allocate a new agent blk and assign it to that entry */
285 agent = calloc(sizeof(char),
286 sizeof(lutf_agent_blk_t));
288 MUTEX_UNLOCK(&agent_array_mutex);
292 gettimeofday(&agent->time_stamp, NULL);
294 agent->iFileDesc = INVALID_TCP_SOCKET;
295 agent->iRpcFd = INVALID_TCP_SOCKET;
297 set_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
299 pthread_mutex_init(&agent->mutex, NULL);
300 acquire_agent_blk(agent);
302 /* assign to array */
303 agent_live_list[i] = agent;
305 /* release the array mutex */
306 MUTEX_UNLOCK(&agent_array_mutex);
308 /* return the agent blk */
312 lutf_agent_blk_t *find_agent_blk_by_id(int idx)
314 lutf_agent_blk_t *agent;
316 if ((idx < 0) || (idx >= MAX_NUM_AGENTS))
319 /* grab the array mutex */
320 MUTEX_LOCK(&agent_array_mutex);
322 /* if the blk is non null grab the mutex.
323 * possibly block until previous user is done
325 if (agent_live_list[idx] == NULL) {
326 MUTEX_UNLOCK(&agent_array_mutex);
330 agent = agent_live_list[idx];
332 if (agent_alive(agent))
333 acquire_agent_blk(agent);
337 /* release the array mutex */
338 MUTEX_UNLOCK(&agent_array_mutex);
340 /* return the agent blk */
344 void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
346 MUTEX_LOCK(&agent->mutex);
347 agent->state |= state;
348 MUTEX_UNLOCK(&agent->mutex);
351 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
353 MUTEX_LOCK(&agent->mutex);
354 agent->state &= ~state;
355 MUTEX_UNLOCK(&agent->mutex);
358 char *agent_ip2str(lutf_agent_blk_t *agent)
363 return inet_ntoa(agent->addr.sin_addr);
366 int get_num_agents(void)
371 for (i = 0; i < MAX_NUM_AGENTS; i++) {
372 if (agent_live_list[i] != NULL)
379 lutf_agent_blk_t *find_agent_blk_by_name(char *name)
381 lutf_agent_blk_t *agent;
387 MUTEX_LOCK(&agent_array_mutex);
389 for (i = 0; i < MAX_NUM_AGENTS; i++) {
390 agent = agent_live_list[i];
391 if ((agent) && agent_alive(agent) &&
392 ((strcmp(agent->name, name) == 0) ||
393 (strcmp(name, TEST_ROLE_GRC) == 0))) {
394 acquire_agent_blk(agent);
401 MUTEX_UNLOCK(&agent_array_mutex);
403 /* return the agent blk */
407 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
409 lutf_agent_blk_t *agent;
411 struct sockaddr_in addr;
416 inet_aton(ip, &addr.sin_addr);
418 /* grab the array mutex */
419 MUTEX_LOCK(&agent_array_mutex);
421 for (i = 0; i < MAX_NUM_AGENTS; i++) {
422 agent = agent_live_list[i];
423 if ((agent) && agent_alive(agent) &&
424 (agent->addr.sin_addr.s_addr ==
425 addr.sin_addr.s_addr))
432 acquire_agent_blk(agent);
434 /* release the array mutex */
435 MUTEX_UNLOCK(&agent_array_mutex);
437 /* return the agent blk */
441 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
443 struct timeval start;
447 lutf_agent_blk_t *agent;
449 gettimeofday(&start, NULL);
450 gettimeofday(&now, NULL);
453 PDEBUG("No agent to wait for");
454 return EN_LUTF_RC_OK;
457 PDEBUG("Start waiting for Agents");
459 while (now.tv_sec - start.tv_sec < timeout && !found) {
461 PDEBUG("Waiting for Agents");
462 while (cYAML_get_next_seq_item(agents, &a) != NULL) {
463 PDEBUG("Looking up: %s", a->cy_valuestring);
464 agent = find_agent_blk_by_name(a->cy_valuestring);
466 PDEBUG("agent %s found\n", agent->name);
467 release_agent_blk(agent, false);
475 gettimeofday(&now, NULL);
478 return found ? EN_LUTF_RC_OK : EN_LUTF_RC_TIMEOUT;
481 int get_num_agents_remote(char *masterIP, int masterPort)
484 lutf_msg_num_agents_query_t msg;
485 lutf_msg_num_agents_query_t *msg_p;
486 lutf_message_hdr_t hdr;
487 lutf_message_hdr_t *hdr_p;
488 int remoteSocket = INVALID_TCP_SOCKET;
490 char *recvBuf = calloc(1, sizeof(hdr) + sizeof(hdr));
493 PERROR("out of memory");
494 rc = EN_LUTF_RC_FAIL;
498 if (!inet_aton(masterIP, &addr)) {
499 PERROR("bad master IP = %s", masterIP);
500 rc = EN_LUTF_RC_FAIL;
504 /* in network byte order, convert so we can have a
507 remoteSocket = establishTCPConnection(addr.s_addr,
510 if (remoteSocket < 0) {
511 PERROR("establishTCPConnection failure: %s",
512 lutf_rc2str(remoteSocket));
517 rc = lutf_send_msg(remoteSocket, NULL, 0, EN_MSG_TYPE_GET_NUM_AGENTS);
521 rc = readTcpMessage(remoteSocket, recvBuf, sizeof(hdr) + sizeof(msg),
522 TCP_READ_TIMEOUT_SEC);
524 PERROR("failed to receive response");
528 hdr_p = (lutf_message_hdr_t *)recvBuf;
529 msg_p = (lutf_msg_num_agents_query_t *)(recvBuf + sizeof(hdr));
531 if (hdr_p->type != EN_MSG_TYPE_GET_NUM_AGENTS) {
532 PERROR("Unexpected message. Waiting for num agents received %d",
534 rc = EN_LUTF_RC_FAIL;
538 rc = msg_p->num_agents;
541 closeTcpConnection(remoteSocket);
546 lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
548 lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
549 lutf_agent_blk_t *agent_blk;
551 lutf_message_hdr_t hdr;
552 char *recvBuf = NULL;
555 if (!agent || !yaml || !rsp)
558 msg_size = strlen(yaml) + 1;
560 PDEBUG("sending rpc request\n%s", yaml);
562 agent_blk = find_agent_blk_by_name(agent);
564 PERROR("Can't find agent with name: %s", agent);
565 goto fail_rpc_no_agent;
568 MUTEX_LOCK(&agent_blk->mutex);
569 if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
570 MUTEX_UNLOCK(&agent_blk->mutex);
571 PDEBUG("Establishing an RPC channel to agent %s:%s:%d",
573 inet_ntoa(agent_blk->addr.sin_addr),
574 agent_blk->listen_port);
575 /* in network byte order, convert so we can have a
578 agent_blk->iRpcFd = establishTCPConnection(
579 agent_blk->addr.sin_addr.s_addr,
580 htons(agent_blk->listen_port),
582 if (agent_blk->iRpcFd < 0)
584 set_agent_state(agent_blk,
585 LUTF_AGENT_RPC_CHANNEL_CONNECTED);
587 MUTEX_UNLOCK(&agent_blk->mutex);
590 set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
592 rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
593 EN_MSG_TYPE_RPC_REQUEST);
594 if (rc != EN_LUTF_RC_OK) {
595 PERROR("Failed to send rpc message: %s", yaml);
599 /* wait for the response */
600 rc = readTcpMessage(agent_blk->iRpcFd, (char *)&hdr,
601 sizeof(hdr), timeout);
602 if (rc != EN_LUTF_RC_OK) {
603 PERROR("Failed to recv rpc header in timeout %d",
608 if (ntohl(hdr.type) != EN_MSG_TYPE_RPC_RESPONSE ||
609 ntohl(hdr.version) != LUTF_VERSION_NUMBER) {
610 PERROR("Bad response. version %d, type:%d\n",
611 hdr.type, hdr.version);
615 recvBuf = calloc(ntohl(hdr.len), 1);
617 PERROR("Failed to allocate buffer to recv rpc response");
621 rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len),
623 if (rc != EN_LUTF_RC_OK) {
624 PERROR("Failed to recv rpc body in timeout %d", timeout);
629 * once recvBuf is given back to the caller, it's expected that
630 * the caller will manage the memory and free when done. This is
631 * mainly called from python. The SWIG wrapper frees the memory
635 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
636 release_agent_blk(agent_blk, false);
638 return EN_LUTF_RC_OK;
641 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
642 set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
643 release_agent_blk(agent_blk, true);
646 msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
647 strlen(g_lutf_cfg.l_info.hb_info.node_name) + 1;
649 default_rsp = calloc(msg_size, 1);
651 PERROR("Failed to allocate buffer for default response");
654 /* the source for the response would be the agent we sent
655 * to and the destination is me
657 snprintf(default_rsp, msg_size,
658 DEFAULT_RPC_RSP, agent,
659 g_lutf_cfg.l_info.hb_info.node_name);
666 lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
668 lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
669 lutf_agent_blk_t *agent_blk;
676 msg_size = strlen(yaml) + 1;
678 agent_blk = find_agent_blk_by_name(agent);
680 PERROR("Can't find agent with name: %s", agent);
684 MUTEX_LOCK(&agent_blk->mutex);
685 if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
686 MUTEX_UNLOCK(&agent_blk->mutex);
687 PERROR("agent_blk %s doesn't have an RPC channel",
691 MUTEX_UNLOCK(&agent_blk->mutex);
693 set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
694 PDEBUG("sending rpc response\n%s", yaml);
695 rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
696 EN_MSG_TYPE_RPC_RESPONSE);
700 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
701 release_agent_blk(agent_blk, dead);
703 set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
708 void agent_init(void)
710 pthread_mutex_init(&agent_array_mutex, NULL);