1 #include <sys/socket.h>
3 #include <netinet/in.h>
11 #include "lutf_agent.h"
13 #include "lutf_python.h"
15 static pthread_mutex_t agent_array_mutex;
16 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
17 /* TODO: this is probably not thread safe */
18 static char agent_state_str[128];
20 extern bool g_agent_enable_hb;
21 extern struct in_addr g_local_ip;
23 #define DEFAULT_RPC_RSP "rpc:\n src: %s\n dst: %s\n type: failure\n"
25 #define MUTEX_LOCK(x) \
28 #define MUTEX_UNLOCK(x) \
29 pthread_mutex_unlock(x)
33 return inet_ntoa(g_local_ip);
36 void release_agent_blk(lutf_agent_blk_t *agent)
38 /* release the agent blk mutex */
39 MUTEX_LOCK(&agent->mutex);
41 assert(agent->ref_count != 0);
44 MUTEX_UNLOCK(&agent->mutex);
47 void acquire_agent_blk(lutf_agent_blk_t *agent)
49 /* acquire the agent blk mutex */
50 MUTEX_LOCK(&agent->mutex);
53 MUTEX_UNLOCK(&agent->mutex);
56 char *agent_state2str(lutf_agent_blk_t *agent)
59 return "NULL PARAMETER";
61 sprintf(agent_state_str, "%s%s%s%s",
62 (agent->state & LUTF_AGENT_STATE_ALIVE) ? "alive " : "dead ",
63 (agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED) ? " HB" : "",
64 (agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED) ? " RPC" : "",
65 (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) ? " WIP" : "");
67 return agent_state_str;
70 static lutf_agent_blk_t *find_agent_blk_by_addr(lutf_agent_blk_t **list,
71 struct sockaddr_in *addr)
74 lutf_agent_blk_t *agent;
79 MUTEX_LOCK(&agent_array_mutex);
80 for (i = 0; i < MAX_NUM_AGENTS; i++) {
83 (agent->addr.sin_addr.s_addr ==
84 addr->sin_addr.s_addr)) {
85 MUTEX_UNLOCK(&agent_array_mutex);
89 MUTEX_UNLOCK(&agent_array_mutex);
94 int get_next_active_agent(int idx, lutf_agent_blk_t **out)
97 lutf_agent_blk_t *agent = NULL;
99 if (idx >= MAX_NUM_AGENTS)
102 MUTEX_LOCK(&agent_array_mutex);
103 for (i = idx; i < MAX_NUM_AGENTS; i++) {
104 agent = agent_live_list[i];
107 acquire_agent_blk(agent);
111 MUTEX_UNLOCK(&agent_array_mutex);
119 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
121 lutf_agent_blk_t *agent;
122 agent = find_agent_blk_by_addr(agent_live_list, addr);
124 return find_free_agent_blk(addr);
126 MUTEX_LOCK(&agent_array_mutex);
127 acquire_agent_blk(agent);
128 MUTEX_UNLOCK(&agent_array_mutex);
133 int lutf_agent_get_highest_fd(void)
135 lutf_agent_blk_t *agent;
136 int iMaxFd = INVALID_TCP_SOCKET;
139 MUTEX_LOCK(&agent_array_mutex);
140 for (i = 0; i < MAX_NUM_AGENTS; i++) {
141 agent = agent_live_list[i];
143 if (agent->iFileDesc > iMaxFd)
144 iMaxFd = agent->iFileDesc;
145 if (agent->iRpcFd > iMaxFd)
146 iMaxFd = agent->iRpcFd;
149 MUTEX_UNLOCK(&agent_array_mutex);
154 void agent_disable_hb(void)
156 g_agent_enable_hb = false;
159 void agent_enable_hb(void)
161 g_agent_enable_hb = true;
164 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
167 lutf_agent_blk_t *agent;
169 /* grab the lock for the array */
170 MUTEX_LOCK(&agent_array_mutex);
172 /* iterate through the array to find a free entry */
173 while (agent_live_list[i] != NULL)
176 if (i >= MAX_NUM_AGENTS) {
177 MUTEX_UNLOCK(&agent_array_mutex);
181 /* allocate a new agent blk and assign it to that entry */
182 agent = calloc(sizeof(char),
183 sizeof(lutf_agent_blk_t));
185 MUTEX_UNLOCK(&agent_array_mutex);
189 gettimeofday(&agent->time_stamp, NULL);
191 agent->iFileDesc = INVALID_TCP_SOCKET;
192 agent->iRpcFd = INVALID_TCP_SOCKET;
194 set_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
196 pthread_mutex_init(&agent->mutex, NULL);
197 acquire_agent_blk(agent);
199 /* assign to array */
200 agent_live_list[i] = agent;
202 /* release the array mutex */
203 MUTEX_UNLOCK(&agent_array_mutex);
205 /* return the agent blk */
209 lutf_agent_blk_t *find_agent_blk_by_id(int idx)
211 lutf_agent_blk_t *agent;
213 if ((idx < 0) || (idx >= MAX_NUM_AGENTS))
216 /* grab the array mutex */
217 MUTEX_LOCK(&agent_array_mutex);
219 /* if the blk is non null grab the mutex.
220 * possibly block until previous user is done
222 if (agent_live_list[idx] == NULL) {
223 MUTEX_UNLOCK(&agent_array_mutex);
227 agent = agent_live_list[idx];
229 acquire_agent_blk(agent);
231 /* release the array mutex */
232 MUTEX_UNLOCK(&agent_array_mutex);
234 /* return the agent blk */
238 void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
240 MUTEX_LOCK(&agent->mutex);
241 agent->state |= state;
242 MUTEX_UNLOCK(&agent->mutex);
245 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
249 MUTEX_LOCK(&agent->mutex);
250 agent->state &= ~state;
251 if (!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS) &&
252 (agent->state & LUTF_AGENT_ZOMBIE))
254 MUTEX_UNLOCK(&agent->mutex);
257 free_agent_blk(agent->id);
260 void free_agent_blk(int id)
262 lutf_agent_blk_t *agent;
264 /* grab the array mutex */
265 MUTEX_LOCK(&agent_array_mutex);
267 /* if the blk is non null grab the mutex.
268 * possibly block until previous user is done
270 if (agent_live_list[id] == NULL) {
271 MUTEX_UNLOCK(&agent_array_mutex);
275 agent = agent_live_list[id];
277 MUTEX_LOCK(&agent->mutex);
278 if (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) {
279 MUTEX_UNLOCK(&agent->mutex);
280 MUTEX_UNLOCK(&agent_array_mutex);
281 PDEBUG("delay deleting agent %s\n", agent->name);
282 set_agent_state(agent, LUTF_AGENT_ZOMBIE);
285 MUTEX_UNLOCK(&agent->mutex);
287 agent_live_list[id] = NULL;
289 /* release the array mutex */
290 MUTEX_UNLOCK(&agent_array_mutex);
296 char *agent_ip2str(lutf_agent_blk_t *agent)
301 return inet_ntoa(agent->addr.sin_addr);
304 int get_num_agents(void)
308 for (i = 0; i < MAX_NUM_AGENTS; i++) {
309 if (agent_live_list[i] != NULL)
316 /* no lock version of the function */
317 static lutf_agent_blk_t *find_agent_blk_by_name_nl(char *name)
319 lutf_agent_blk_t *agent;
325 MUTEX_LOCK(&agent_array_mutex);
327 for (i = 0; i < MAX_NUM_AGENTS; i++) {
328 agent = agent_live_list[i];
330 ((strcmp(agent->name, name) == 0) ||
331 (strcmp(name, TEST_ROLE_GRC) == 0))) {
336 MUTEX_UNLOCK(&agent_array_mutex);
338 /* return the agent blk */
342 lutf_agent_blk_t *find_agent_blk_by_name(char *name)
344 lutf_agent_blk_t *agent;
346 agent = find_agent_blk_by_name_nl(name);
348 acquire_agent_blk(agent);
350 /* return the agent blk */
354 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
356 lutf_agent_blk_t *agent;
358 struct sockaddr_in addr;
363 inet_aton(ip, &addr.sin_addr);
365 /* grab the array mutex */
366 MUTEX_LOCK(&agent_array_mutex);
368 for (i = 0; i < MAX_NUM_AGENTS; i++) {
369 agent = agent_live_list[i];
370 if ((agent) && (agent->addr.sin_addr.s_addr ==
371 addr.sin_addr.s_addr))
376 acquire_agent_blk(agent);
378 /* release the array mutex */
379 MUTEX_UNLOCK(&agent_array_mutex);
381 /* return the agent blk */
385 void agent_hb_check(struct timeval *t, lutf_type_t me)
387 lutf_agent_blk_t *agent;
390 /* grab the array mutex */
391 MUTEX_LOCK(&agent_array_mutex);
393 for (i = 0; i < MAX_NUM_AGENTS; i++) {
394 agent = agent_live_list[i];
395 if (agent && agent->node_type != me) {
396 acquire_agent_blk(agent);
397 if (t->tv_sec - agent->time_stamp.tv_sec >= HB_TO*100) {
398 int agent_id = agent->id;
399 /* agent didn't send a HB move to dead
402 PERROR("agent %s presumed dead", agent->name);
403 release_agent_blk(agent);
404 MUTEX_UNLOCK(&agent_array_mutex);
405 /* free_agent_blk() grabs the mutex */
406 free_agent_blk(agent_id);
407 MUTEX_LOCK(&agent_array_mutex);
410 release_agent_blk(agent);
414 /* release the array mutex */
415 MUTEX_UNLOCK(&agent_array_mutex);
418 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
420 struct timeval start;
424 lutf_agent_blk_t *agent;
426 gettimeofday(&start, NULL);
427 gettimeofday(&now, NULL);
430 PDEBUG("No agent to wait for");
431 return EN_LUTF_RC_OK;
434 PDEBUG("Start waiting for Agents");
436 while (now.tv_sec - start.tv_sec < timeout && !found) {
438 PDEBUG("Waiting for Agents");
439 while (cYAML_get_next_seq_item(agents, &a) != NULL) {
440 PDEBUG("Looking up: %s", a->cy_valuestring);
441 if (!(agent = find_agent_blk_by_name(a->cy_valuestring))) {
445 PDEBUG("agent %s found\n", agent->name);
446 release_agent_blk(agent);
451 gettimeofday(&now, NULL);
454 return found ? EN_LUTF_RC_OK : EN_LUTF_RC_TIMEOUT;
457 int get_num_agents_remote(char *masterIP, int masterPort)
460 lutf_msg_num_agents_query_t msg;
461 lutf_msg_num_agents_query_t *msg_p;
462 lutf_message_hdr_t hdr;
463 lutf_message_hdr_t *hdr_p;
464 int remoteSocket = INVALID_TCP_SOCKET;
466 char *recvBuf = calloc(1, sizeof(hdr) + sizeof(hdr));
469 PERROR("out of memory");
470 rc = EN_LUTF_RC_FAIL;
474 if (!inet_aton(masterIP, &addr)) {
475 PERROR("bad master IP = %s", masterIP);
476 rc = EN_LUTF_RC_FAIL;
480 /* in network byte order, convert so we can have a
483 remoteSocket = establishTCPConnection(addr.s_addr,
486 if (remoteSocket < 0) {
487 PERROR("establishTCPConnection failure: %s", lutf_rc2str(remoteSocket));
492 rc = lutf_send_msg(remoteSocket, NULL, 0, EN_MSG_TYPE_GET_NUM_AGENTS);
496 rc = readTcpMessage(remoteSocket, recvBuf, sizeof(hdr) + sizeof(msg),
497 TCP_READ_TIMEOUT_SEC);
499 PERROR("failed to receive response");
503 hdr_p = (lutf_message_hdr_t *)recvBuf;
504 msg_p = (lutf_msg_num_agents_query_t *)(recvBuf + sizeof(hdr));
506 if (hdr_p->type != EN_MSG_TYPE_GET_NUM_AGENTS) {
507 PERROR("Unexpected message. Waiting for num agents received %d",
509 rc = EN_LUTF_RC_FAIL;
513 rc = msg_p->num_agents;
516 closeTcpConnection(remoteSocket);
521 lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
523 lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
524 lutf_agent_blk_t *agent_blk;
526 lutf_message_hdr_t hdr;
527 char *recvBuf = NULL;
530 if (!agent || !yaml || !rsp)
533 msg_size = strlen(yaml) + 1;
535 PDEBUG("sending rpc request\n%s", yaml);
537 agent_blk = find_agent_blk_by_name(agent);
539 PERROR("Can't find agent with name: %s", agent);
540 goto fail_rpc_no_agent;
543 MUTEX_LOCK(&agent_blk->mutex);
544 if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
545 MUTEX_UNLOCK(&agent_blk->mutex);
546 PDEBUG("Establishing an RPC channel to agent %s:%s:%d",
548 inet_ntoa(agent_blk->addr.sin_addr),
549 agent_blk->listen_port);
550 /* in network byte order, convert so we can have a uniform API */
551 agent_blk->iRpcFd = establishTCPConnection(
552 agent_blk->addr.sin_addr.s_addr,
553 htons(agent_blk->listen_port),
555 if (agent_blk->iRpcFd < 0)
557 set_agent_state(agent_blk,
558 LUTF_AGENT_RPC_CHANNEL_CONNECTED);
560 MUTEX_UNLOCK(&agent_blk->mutex);
563 set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
565 rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
566 EN_MSG_TYPE_RPC_REQUEST);
567 if (rc != EN_LUTF_RC_OK) {
568 PERROR("Failed to send rpc message: %s", yaml);
572 /* wait for the response */
573 rc = readTcpMessage(agent_blk->iRpcFd, (char *)&hdr,
574 sizeof(hdr), timeout);
575 if (rc != EN_LUTF_RC_OK) {
576 PERROR("Failed to recv rpc header in timeout %d",
581 if (ntohl(hdr.type) != EN_MSG_TYPE_RPC_RESPONSE ||
582 ntohl(hdr.version) != LUTF_VERSION_NUMBER) {
583 PERROR("Bad response. version %d, type:%d\n",
584 hdr.type, hdr.version);
588 recvBuf = calloc(ntohl(hdr.len), 1);
590 PERROR("Failed to allocate buffer to recv rpc response");
594 rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len), timeout);
595 if (rc != EN_LUTF_RC_OK) {
596 PERROR("Failed to recv rpc body in timeout %d", timeout);
601 * once recvBuf is given back to the caller, it's expected that
602 * the caller will manage the memory and free when done. This is
603 * mainly called from python. The SWIG wrapper frees the memory
607 release_agent_blk(agent_blk);
609 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
611 return EN_LUTF_RC_OK;
614 release_agent_blk(agent_blk);
615 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
618 msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
619 strlen(g_lutf_cfg.l_info.hb_info.node_name) + 1;
621 default_rsp = calloc(msg_size, 1);
623 PERROR("Failed to allocate buffer for default response");
626 /* the source for the response would be the agent we sent
627 * to and the destination is me
629 snprintf(default_rsp, msg_size,
630 DEFAULT_RPC_RSP, agent,
631 g_lutf_cfg.l_info.hb_info.node_name);
638 lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
640 lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
641 lutf_agent_blk_t *agent_blk;
647 msg_size = strlen(yaml) + 1;
649 agent_blk = find_agent_blk_by_name_nl(agent);
651 PERROR("Can't find agent with name: %s", agent);
655 MUTEX_LOCK(&agent_blk->mutex);
656 if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
657 MUTEX_UNLOCK(&agent_blk->mutex);
658 PERROR("agent_blk %s doesn't have an RPC channel",
662 MUTEX_UNLOCK(&agent_blk->mutex);
664 PDEBUG("sending rpc response\n%s", yaml);
665 rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
666 EN_MSG_TYPE_RPC_RESPONSE);
672 void agent_init(void)
674 pthread_mutex_init(&agent_array_mutex, NULL);