1 // SPDX-License-Identifier: GPL-2.0
4 * This file is part of Lustre, http://www.lustre.org/
6 * lustre/tests/lutf/liblutf_agent.c
8 * LUTF agent setup (and some RPC code)
10 * Author: Amir Shehata <ashehata@whamcloud.com>
14 #include <sys/socket.h>
16 #include <netinet/in.h>
17 #include <arpa/inet.h>
24 #include "lutf_agent.h"
26 #include "lutf_python.h"
27 #include "lutf_listener.h"
29 static pthread_mutex_t agent_array_mutex;
30 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
31 static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS];
32 /* TODO: this is probably not thread safe */
33 static char agent_state_str[128];
35 static bool g_agent_enable_hb = true;
36 static struct in_addr g_local_ip;
38 #define DEFAULT_RPC_RSP "rpc:\n src: %s\n dst: %s\n type: failure\n"
40 #define MUTEX_LOCK(x) \
43 #define MUTEX_UNLOCK(x) \
44 pthread_mutex_unlock(x)
48 return inet_ntoa(g_local_ip);
51 static void insert_dead_agent_locked(lutf_agent_blk_t *agent)
55 for (i = 0; i < MAX_NUM_AGENTS; i++) {
56 if (agent_dead_list[i] == NULL) {
57 agent->state |= LUTF_AGENT_STATE_DEAD;
58 agent_dead_list[i] = agent;
63 assert(i < MAX_NUM_AGENTS);
66 static void del_dead_agent_locked(lutf_agent_blk_t *agent)
69 agent->state & LUTF_AGENT_STATE_DEAD &&
70 agent_dead_list[agent->id] != NULL &&
71 agent_dead_list[agent->id] == agent);
73 assert(agent->ref_count > 0);
76 if (agent->ref_count == 0) {
77 agent_dead_list[agent->id] = NULL;
78 memset(agent, 0xdeadbeef, sizeof(*agent));
83 void release_dead_list_agents(void)
87 MUTEX_LOCK(&agent_array_mutex);
88 for (i = 0; i < MAX_NUM_AGENTS; i++) {
89 lutf_agent_blk_t *agent;
91 agent = agent_dead_list[i];
93 if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) {
94 agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN;
95 del_dead_agent_locked(agent);
98 MUTEX_UNLOCK(&agent_array_mutex);
101 static inline bool agent_alive(lutf_agent_blk_t *agent)
105 MUTEX_LOCK(&agent->mutex);
106 if (agent->state & LUTF_AGENT_STATE_ALIVE)
108 MUTEX_UNLOCK(&agent->mutex);
113 void release_agent_blk(lutf_agent_blk_t *agent, int dead)
117 MUTEX_LOCK(&agent_array_mutex);
118 MUTEX_LOCK(&agent->mutex);
120 if (agent->state & LUTF_AGENT_STATE_ALIVE) {
122 assert(agent_live_list[agent->id] != NULL &&
123 agent_live_list[agent->id] == agent);
125 MUTEX_UNLOCK(&agent->mutex);
126 del_dead_agent_locked(agent);
127 MUTEX_UNLOCK(&agent_array_mutex);
131 assert(agent->ref_count > 0);
134 if (agent->ref_count == 0) {
135 agent_live_list[agent->id] = NULL;
136 assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS));
137 MUTEX_UNLOCK(&agent->mutex);
138 MUTEX_UNLOCK(&agent_array_mutex);
139 close_agent_connection(agent);
140 memset(agent, 0xdeadbeef, sizeof(*agent));
144 agent_live_list[agent->id] = NULL;
145 insert_dead_agent_locked(agent);
146 MUTEX_UNLOCK(&agent->mutex);
147 MUTEX_UNLOCK(&agent_array_mutex);
148 unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
149 unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED);
150 unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED);
151 close_agent_connection(agent);
153 MUTEX_UNLOCK(&agent->mutex);
154 MUTEX_UNLOCK(&agent_array_mutex);
158 void acquire_agent_blk(lutf_agent_blk_t *agent)
160 /* acquire the agent blk mutex */
161 MUTEX_LOCK(&agent->mutex);
164 MUTEX_UNLOCK(&agent->mutex);
167 char *agent_state2str(lutf_agent_blk_t *agent)
170 return "NULL PARAMETER";
172 sprintf(agent_state_str, "%s%s%s%s",
173 (agent->state & LUTF_AGENT_STATE_ALIVE) ? "alive " : "dead ",
174 (agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED) ? " HB" : "",
175 (agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED) ? " RPC" : "",
176 (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) ? " WIP" : "");
178 return agent_state_str;
181 static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr)
184 lutf_agent_blk_t *agent;
189 MUTEX_LOCK(&agent_array_mutex);
190 for (i = 0; i < MAX_NUM_AGENTS; i++) {
191 agent = agent_live_list[i];
192 if ((agent) && agent_alive(agent) &&
193 (agent->addr.sin_addr.s_addr ==
194 addr->sin_addr.s_addr)) {
195 acquire_agent_blk(agent);
196 MUTEX_UNLOCK(&agent_array_mutex);
200 MUTEX_UNLOCK(&agent_array_mutex);
205 int get_next_active_agent(int idx, lutf_agent_blk_t **out)
208 lutf_agent_blk_t *agent = NULL;
210 if (idx >= MAX_NUM_AGENTS)
213 MUTEX_LOCK(&agent_array_mutex);
214 for (i = idx; i < MAX_NUM_AGENTS; i++) {
215 agent = agent_live_list[i];
216 if (agent && agent_alive(agent)) {
218 acquire_agent_blk(agent);
222 MUTEX_UNLOCK(&agent_array_mutex);
230 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
232 lutf_agent_blk_t *agent;
234 agent = find_agent_blk_by_addr(addr);
236 return find_free_agent_blk(addr);
237 release_agent_blk(agent, false);
242 int lutf_agent_get_highest_fd(void)
244 lutf_agent_blk_t *agent;
245 int iMaxFd = INVALID_TCP_SOCKET;
248 MUTEX_LOCK(&agent_array_mutex);
249 for (i = 0; i < MAX_NUM_AGENTS; i++) {
250 agent = agent_live_list[i];
252 if (agent->iFileDesc > iMaxFd)
253 iMaxFd = agent->iFileDesc;
254 if (agent->iRpcFd > iMaxFd)
255 iMaxFd = agent->iRpcFd;
258 MUTEX_UNLOCK(&agent_array_mutex);
263 void agent_disable_hb(void)
265 g_agent_enable_hb = false;
268 void agent_enable_hb(void)
270 g_agent_enable_hb = true;
273 int agent_get_hb(void)
275 return g_agent_enable_hb;
278 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
281 lutf_agent_blk_t *agent;
283 /* grab the lock for the array */
284 MUTEX_LOCK(&agent_array_mutex);
286 /* iterate through the array to find a free entry */
287 for (i = 0; i < MAX_NUM_AGENTS; i++) {
288 if (agent_live_list[i] == NULL)
292 if (i >= MAX_NUM_AGENTS) {
293 MUTEX_UNLOCK(&agent_array_mutex);
297 /* allocate a new agent blk and assign it to that entry */
298 agent = calloc(sizeof(char),
299 sizeof(lutf_agent_blk_t));
301 MUTEX_UNLOCK(&agent_array_mutex);
305 gettimeofday(&agent->time_stamp, NULL);
307 agent->iFileDesc = INVALID_TCP_SOCKET;
308 agent->iRpcFd = INVALID_TCP_SOCKET;
310 set_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
312 pthread_mutex_init(&agent->mutex, NULL);
313 acquire_agent_blk(agent);
315 /* assign to array */
316 agent_live_list[i] = agent;
318 /* release the array mutex */
319 MUTEX_UNLOCK(&agent_array_mutex);
321 /* return the agent blk */
325 lutf_agent_blk_t *find_agent_blk_by_id(int idx)
327 lutf_agent_blk_t *agent;
329 if ((idx < 0) || (idx >= MAX_NUM_AGENTS))
332 /* grab the array mutex */
333 MUTEX_LOCK(&agent_array_mutex);
335 /* if the blk is non null grab the mutex.
336 * possibly block until previous user is done
338 if (agent_live_list[idx] == NULL) {
339 MUTEX_UNLOCK(&agent_array_mutex);
343 agent = agent_live_list[idx];
345 if (agent_alive(agent))
346 acquire_agent_blk(agent);
350 /* release the array mutex */
351 MUTEX_UNLOCK(&agent_array_mutex);
353 /* return the agent blk */
357 void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
359 MUTEX_LOCK(&agent->mutex);
360 agent->state |= state;
361 MUTEX_UNLOCK(&agent->mutex);
364 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
366 MUTEX_LOCK(&agent->mutex);
367 agent->state &= ~state;
368 MUTEX_UNLOCK(&agent->mutex);
371 char *agent_ip2str(lutf_agent_blk_t *agent)
376 return inet_ntoa(agent->addr.sin_addr);
379 int get_num_agents(void)
384 for (i = 0; i < MAX_NUM_AGENTS; i++) {
385 if (agent_live_list[i] != NULL)
392 lutf_agent_blk_t *find_agent_blk_by_name(char *name)
394 lutf_agent_blk_t *agent;
400 MUTEX_LOCK(&agent_array_mutex);
402 for (i = 0; i < MAX_NUM_AGENTS; i++) {
403 agent = agent_live_list[i];
404 if ((agent) && agent_alive(agent) &&
405 ((strcmp(agent->name, name) == 0) ||
406 (strcmp(name, TEST_ROLE_GRC) == 0))) {
407 acquire_agent_blk(agent);
414 MUTEX_UNLOCK(&agent_array_mutex);
416 /* return the agent blk */
420 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
422 lutf_agent_blk_t *agent;
424 struct sockaddr_in addr;
429 inet_aton(ip, &addr.sin_addr);
431 /* grab the array mutex */
432 MUTEX_LOCK(&agent_array_mutex);
434 for (i = 0; i < MAX_NUM_AGENTS; i++) {
435 agent = agent_live_list[i];
436 if ((agent) && agent_alive(agent) &&
437 (agent->addr.sin_addr.s_addr ==
438 addr.sin_addr.s_addr))
445 acquire_agent_blk(agent);
447 /* release the array mutex */
448 MUTEX_UNLOCK(&agent_array_mutex);
450 /* return the agent blk */
454 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
456 struct timeval start;
459 lutf_agent_blk_t *agent;
461 gettimeofday(&start, NULL);
462 gettimeofday(&now, NULL);
465 PDEBUG("No agent to wait for");
466 return EN_LUTF_RC_OK;
469 PDEBUG("Start waiting for Agents");
471 while (now.tv_sec - start.tv_sec < timeout && !found) {
472 struct cYAML *a = NULL;
475 PDEBUG("Waiting for Agents");
476 while (cYAML_get_next_seq_item(agents, &a) != NULL) {
477 PDEBUG("Looking up: %s", a->cy_valuestring);
478 agent = find_agent_blk_by_name(a->cy_valuestring);
480 PDEBUG("agent %s found\n", agent->name);
481 release_agent_blk(agent, false);
489 gettimeofday(&now, NULL);
492 return found ? EN_LUTF_RC_OK : EN_LUTF_RC_TIMEOUT;
495 int get_num_agents_remote(char *masterIP, int masterPort)
498 lutf_msg_num_agents_query_t msg;
499 lutf_msg_num_agents_query_t *msg_p;
500 lutf_message_hdr_t hdr;
501 lutf_message_hdr_t *hdr_p;
502 int remoteSocket = INVALID_TCP_SOCKET;
504 char *recvBuf = calloc(1, sizeof(hdr) + sizeof(hdr));
507 PERROR("out of memory");
508 rc = EN_LUTF_RC_FAIL;
512 if (!inet_aton(masterIP, &addr)) {
513 PERROR("bad master IP = %s", masterIP);
514 rc = EN_LUTF_RC_FAIL;
518 /* in network byte order, convert so we can have a
521 remoteSocket = establishTCPConnection(addr.s_addr,
524 if (remoteSocket < 0) {
525 PERROR("establishTCPConnection failure: %s",
526 lutf_rc2str(remoteSocket));
531 rc = lutf_send_msg(remoteSocket, NULL, 0, EN_MSG_TYPE_GET_NUM_AGENTS);
535 rc = readTcpMessage(remoteSocket, recvBuf, sizeof(hdr) + sizeof(msg),
536 TCP_READ_TIMEOUT_SEC);
538 PERROR("failed to receive response");
542 hdr_p = (lutf_message_hdr_t *)recvBuf;
543 msg_p = (lutf_msg_num_agents_query_t *)(recvBuf + sizeof(hdr));
545 if (hdr_p->type != EN_MSG_TYPE_GET_NUM_AGENTS) {
546 PERROR("Unexpected message. Waiting for num agents received %d",
548 rc = EN_LUTF_RC_FAIL;
552 rc = msg_p->num_agents;
555 closeTcpConnection(remoteSocket);
560 lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
562 lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
563 lutf_agent_blk_t *agent_blk = NULL;
565 lutf_message_hdr_t hdr;
566 char *recvBuf = NULL;
569 if (!agent || !yaml || !rsp)
572 msg_size = strlen(yaml) + 1;
574 PDEBUG("sending rpc request\n%s", yaml);
576 agent_blk = find_agent_blk_by_name(agent);
578 PERROR("Can't find agent with name: %s", agent);
579 goto fail_rpc_no_agent;
582 MUTEX_LOCK(&agent_blk->mutex);
583 if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
584 MUTEX_UNLOCK(&agent_blk->mutex);
585 PDEBUG("Establishing an RPC channel to agent %s:%s:%d",
587 inet_ntoa(agent_blk->addr.sin_addr),
588 agent_blk->listen_port);
589 /* in network byte order, convert so we can have a
592 agent_blk->iRpcFd = establishTCPConnection(
593 agent_blk->addr.sin_addr.s_addr,
594 htons(agent_blk->listen_port),
596 if (agent_blk->iRpcFd < 0)
598 set_agent_state(agent_blk,
599 LUTF_AGENT_RPC_CHANNEL_CONNECTED);
601 MUTEX_UNLOCK(&agent_blk->mutex);
604 set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
606 rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
607 EN_MSG_TYPE_RPC_REQUEST);
608 if (rc != EN_LUTF_RC_OK) {
609 PERROR("Failed to send rpc message: %s", yaml);
613 /* wait for the response */
614 rc = readTcpMessage(agent_blk->iRpcFd, (char *)&hdr,
615 sizeof(hdr), timeout);
616 if (rc != EN_LUTF_RC_OK) {
617 PERROR("Failed to recv rpc header in timeout %d",
622 if (ntohl(hdr.type) != EN_MSG_TYPE_RPC_RESPONSE ||
623 ntohl(hdr.version) != LUTF_VERSION_NUMBER) {
624 PERROR("Bad response. version %d, type:%d\n",
625 hdr.type, hdr.version);
629 recvBuf = calloc(ntohl(hdr.len), 1);
631 PERROR("Failed to allocate buffer to recv rpc response");
635 rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len),
637 if (rc != EN_LUTF_RC_OK) {
638 PERROR("Failed to recv rpc body in timeout %d", timeout);
643 * once recvBuf is given back to the caller, it's expected that
644 * the caller will manage the memory and free when done. This is
645 * mainly called from python. The SWIG wrapper frees the memory
649 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
650 release_agent_blk(agent_blk, false);
652 return EN_LUTF_RC_OK;
655 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
656 set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
657 release_agent_blk(agent_blk, true);
660 msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
661 strlen(g_lutf_cfg.l_info.hb_info.node_name) + 1;
663 default_rsp = calloc(msg_size, 1);
665 PERROR("Failed to allocate buffer for default response");
668 /* the source for the response would be the agent we sent
669 * to and the destination is me
671 snprintf(default_rsp, msg_size,
672 DEFAULT_RPC_RSP, agent,
673 g_lutf_cfg.l_info.hb_info.node_name);
680 lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
682 lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
683 lutf_agent_blk_t *agent_blk;
690 msg_size = strlen(yaml) + 1;
692 agent_blk = find_agent_blk_by_name(agent);
694 PERROR("Can't find agent with name: %s", agent);
698 MUTEX_LOCK(&agent_blk->mutex);
699 if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
700 MUTEX_UNLOCK(&agent_blk->mutex);
701 PERROR("agent_blk %s doesn't have an RPC channel",
705 MUTEX_UNLOCK(&agent_blk->mutex);
707 set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
708 PDEBUG("sending rpc response\n%s", yaml);
709 rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
710 EN_MSG_TYPE_RPC_RESPONSE);
714 unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
715 release_agent_blk(agent_blk, dead);
717 set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
722 void agent_init(void)
724 pthread_mutex_init(&agent_array_mutex, NULL);