Whamcloud - gitweb
New tag 2.15.63
[fs/lustre-release.git] / lustre / tests / lutf / src / liblutf_agent.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * This file is part of Lustre, http://www.lustre.org/
5  *
6  * lustre/tests/lutf/liblutf_agent.c
7  *
8  * LUTF agent setup (and some RPC code)
9  *
10  * Author: Amir Shehata <ashehata@whamcloud.com>
11  *
12  */
13
14 #include <sys/socket.h>
15 #include <sys/time.h>
16 #include <netinet/in.h>
17 #include <arpa/inet.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <unistd.h>
22 #include <assert.h>
23 #include "cyaml.h"
24 #include "lutf_agent.h"
25 #include "lutf.h"
26 #include "lutf_python.h"
27 #include "lutf_listener.h"
28
29 static pthread_mutex_t agent_array_mutex;
30 static lutf_agent_blk_t *agent_live_list[MAX_NUM_AGENTS];
31 static lutf_agent_blk_t *agent_dead_list[MAX_NUM_AGENTS];
32 /* TODO: this is probably not thread safe */
33 static char agent_state_str[128];
34
35 static bool g_agent_enable_hb = true;
36 static struct in_addr g_local_ip;
37
38 #define DEFAULT_RPC_RSP "rpc:\n   src: %s\n   dst: %s\n   type: failure\n"
39
40 #define MUTEX_LOCK(x) \
41   pthread_mutex_lock(x)
42
43 #define MUTEX_UNLOCK(x) \
44   pthread_mutex_unlock(x)
45
46 char *get_local_ip()
47 {
48         return inet_ntoa(g_local_ip);
49 }
50
51 static void insert_dead_agent_locked(lutf_agent_blk_t *agent)
52 {
53         int i = 0;
54
55         for (i = 0; i < MAX_NUM_AGENTS; i++) {
56                 if (agent_dead_list[i] == NULL) {
57                         agent->state |= LUTF_AGENT_STATE_DEAD;
58                         agent_dead_list[i] = agent;
59                         agent->id = i;
60                         break;
61                 }
62         }
63         assert(i < MAX_NUM_AGENTS);
64 }
65
66 static void del_dead_agent_locked(lutf_agent_blk_t *agent)
67 {
68         assert(agent &&
69                agent->state & LUTF_AGENT_STATE_DEAD &&
70                agent_dead_list[agent->id] != NULL &&
71                agent_dead_list[agent->id] == agent);
72
73         assert(agent->ref_count > 0);
74         agent->ref_count--;
75
76         if (agent->ref_count == 0) {
77                 agent_dead_list[agent->id] = NULL;
78                 memset(agent, 0xdeadbeef, sizeof(*agent));
79                 free(agent);
80         }
81 }
82
83 void release_dead_list_agents(void)
84 {
85         int i;
86
87         MUTEX_LOCK(&agent_array_mutex);
88         for (i = 0; i < MAX_NUM_AGENTS; i++) {
89                 lutf_agent_blk_t *agent;
90
91                 agent = agent_dead_list[i];
92
93                 if (agent && (agent->state & LUTF_AGENT_NEED_LISTEN_CLEAN)) {
94                         agent->state &= ~LUTF_AGENT_NEED_LISTEN_CLEAN;
95                         del_dead_agent_locked(agent);
96                 }
97         }
98         MUTEX_UNLOCK(&agent_array_mutex);
99 }
100
101 static inline bool agent_alive(lutf_agent_blk_t *agent)
102 {
103         bool viable = false;
104
105         MUTEX_LOCK(&agent->mutex);
106         if (agent->state & LUTF_AGENT_STATE_ALIVE)
107                 viable = true;
108         MUTEX_UNLOCK(&agent->mutex);
109
110         return viable;
111 }
112
113 void release_agent_blk(lutf_agent_blk_t *agent, int dead)
114 {
115         assert(agent);
116
117         MUTEX_LOCK(&agent_array_mutex);
118         MUTEX_LOCK(&agent->mutex);
119
120         if (agent->state & LUTF_AGENT_STATE_ALIVE) {
121                 /* sanity check */
122                 assert(agent_live_list[agent->id] != NULL &&
123                        agent_live_list[agent->id] == agent);
124         } else {
125                 MUTEX_UNLOCK(&agent->mutex);
126                 del_dead_agent_locked(agent);
127                 MUTEX_UNLOCK(&agent_array_mutex);
128                 return;
129         }
130
131         assert(agent->ref_count > 0);
132         agent->ref_count--;
133
134         if (agent->ref_count == 0) {
135                 agent_live_list[agent->id] = NULL;
136                 assert(!(agent->state & LUTF_AGENT_WORK_IN_PROGRESS));
137                 MUTEX_UNLOCK(&agent->mutex);
138                 MUTEX_UNLOCK(&agent_array_mutex);
139                 close_agent_connection(agent);
140                 memset(agent, 0xdeadbeef, sizeof(*agent));
141                 /* free the block */
142                 free(agent);
143         } else if (dead) {
144                 agent_live_list[agent->id] = NULL;
145                 insert_dead_agent_locked(agent);
146                 MUTEX_UNLOCK(&agent->mutex);
147                 MUTEX_UNLOCK(&agent_array_mutex);
148                 unset_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
149                 unset_agent_state(agent, LUTF_AGENT_RPC_CHANNEL_CONNECTED);
150                 unset_agent_state(agent, LUTF_AGENT_HB_CHANNEL_CONNECTED);
151                 close_agent_connection(agent);
152         } else {
153                 MUTEX_UNLOCK(&agent->mutex);
154                 MUTEX_UNLOCK(&agent_array_mutex);
155         }
156 }
157
158 void acquire_agent_blk(lutf_agent_blk_t *agent)
159 {
160         /* acquire the agent blk mutex */
161         MUTEX_LOCK(&agent->mutex);
162         if (agent)
163                 agent->ref_count++;
164         MUTEX_UNLOCK(&agent->mutex);
165 }
166
167 char *agent_state2str(lutf_agent_blk_t *agent)
168 {
169         if (!agent)
170                 return "NULL PARAMETER";
171
172         sprintf(agent_state_str, "%s%s%s%s",
173                 (agent->state & LUTF_AGENT_STATE_ALIVE) ? "alive " : "dead ",
174                 (agent->state & LUTF_AGENT_HB_CHANNEL_CONNECTED) ? " HB" : "",
175                 (agent->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED) ? " RPC" : "",
176                 (agent->state & LUTF_AGENT_WORK_IN_PROGRESS) ? " WIP" : "");
177
178         return agent_state_str;
179 }
180
181 static lutf_agent_blk_t *find_agent_blk_by_addr(struct sockaddr_in *addr)
182 {
183         int i;
184         lutf_agent_blk_t *agent;
185
186         if (!addr)
187                 return NULL;
188
189         MUTEX_LOCK(&agent_array_mutex);
190         for (i = 0; i < MAX_NUM_AGENTS; i++) {
191                 agent = agent_live_list[i];
192                 if ((agent) && agent_alive(agent) &&
193                     (agent->addr.sin_addr.s_addr ==
194                      addr->sin_addr.s_addr)) {
195                         acquire_agent_blk(agent);
196                         MUTEX_UNLOCK(&agent_array_mutex);
197                         return agent;
198                 }
199         }
200         MUTEX_UNLOCK(&agent_array_mutex);
201
202         return NULL;
203 }
204
205 int get_next_active_agent(int idx, lutf_agent_blk_t **out)
206 {
207         int i = idx;
208         lutf_agent_blk_t *agent = NULL;
209
210         if (idx >= MAX_NUM_AGENTS)
211                 goto out;
212
213         MUTEX_LOCK(&agent_array_mutex);
214         for (i = idx; i < MAX_NUM_AGENTS; i++) {
215                 agent = agent_live_list[i];
216                 if (agent && agent_alive(agent)) {
217                         i++;
218                         acquire_agent_blk(agent);
219                         break;
220                 }
221         }
222         MUTEX_UNLOCK(&agent_array_mutex);
223
224 out:
225         *out = agent;
226
227         return i;
228 }
229
230 lutf_agent_blk_t *find_create_agent_blk_by_addr(struct sockaddr_in *addr)
231 {
232         lutf_agent_blk_t *agent;
233
234         agent = find_agent_blk_by_addr(addr);
235         if (!agent)
236                 return find_free_agent_blk(addr);
237         release_agent_blk(agent, false);
238
239         return agent;
240 }
241
242 int lutf_agent_get_highest_fd(void)
243 {
244         lutf_agent_blk_t *agent;
245         int iMaxFd = INVALID_TCP_SOCKET;
246         int i;
247
248         MUTEX_LOCK(&agent_array_mutex);
249         for (i = 0; i < MAX_NUM_AGENTS; i++) {
250                 agent = agent_live_list[i];
251                 if (agent) {
252                         if (agent->iFileDesc > iMaxFd)
253                                 iMaxFd = agent->iFileDesc;
254                         if (agent->iRpcFd > iMaxFd)
255                                 iMaxFd = agent->iRpcFd;
256                 }
257         }
258         MUTEX_UNLOCK(&agent_array_mutex);
259
260         return iMaxFd;
261 }
262
263 void agent_disable_hb(void)
264 {
265         g_agent_enable_hb = false;
266 }
267
268 void agent_enable_hb(void)
269 {
270         g_agent_enable_hb = true;
271 }
272
273 int agent_get_hb(void)
274 {
275         return g_agent_enable_hb;
276 }
277
278 lutf_agent_blk_t *find_free_agent_blk(struct sockaddr_in *addr)
279 {
280         int i = 0;
281         lutf_agent_blk_t *agent;
282
283         /* grab the lock for the array */
284         MUTEX_LOCK(&agent_array_mutex);
285
286         /* iterate through the array to find a free entry */
287         for (i = 0; i < MAX_NUM_AGENTS; i++) {
288                 if (agent_live_list[i] == NULL)
289                         break;
290         }
291
292         if (i >= MAX_NUM_AGENTS) {
293                 MUTEX_UNLOCK(&agent_array_mutex);
294                 return NULL;
295         }
296
297         /* allocate a new agent blk and assign it to that entry */
298         agent = calloc(sizeof(char),
299                 sizeof(lutf_agent_blk_t));
300         if (!agent) {
301                 MUTEX_UNLOCK(&agent_array_mutex);
302                 return NULL;
303         }
304
305         gettimeofday(&agent->time_stamp, NULL);
306         agent->id = i;
307         agent->iFileDesc = INVALID_TCP_SOCKET;
308         agent->iRpcFd = INVALID_TCP_SOCKET;
309         agent->addr = *addr;
310         set_agent_state(agent, LUTF_AGENT_STATE_ALIVE);
311
312         pthread_mutex_init(&agent->mutex, NULL);
313         acquire_agent_blk(agent);
314
315         /* assign to array */
316         agent_live_list[i] = agent;
317
318         /* release the array mutex */
319         MUTEX_UNLOCK(&agent_array_mutex);
320
321         /* return the agent blk */
322         return agent;
323 }
324
325 lutf_agent_blk_t *find_agent_blk_by_id(int idx)
326 {
327         lutf_agent_blk_t *agent;
328
329         if ((idx < 0) || (idx >= MAX_NUM_AGENTS))
330                 return NULL;
331
332         /* grab the array mutex */
333         MUTEX_LOCK(&agent_array_mutex);
334
335         /* if the blk is non null grab the mutex.
336          * possibly block until previous user is done
337          */
338         if (agent_live_list[idx] == NULL) {
339                 MUTEX_UNLOCK(&agent_array_mutex);
340                 return NULL;
341         }
342
343         agent = agent_live_list[idx];
344
345         if (agent_alive(agent))
346                 acquire_agent_blk(agent);
347         else
348                 agent = NULL;
349
350         /* release the array mutex */
351         MUTEX_UNLOCK(&agent_array_mutex);
352
353         /* return the agent blk */
354         return agent;
355 }
356
357 void set_agent_state(lutf_agent_blk_t *agent, unsigned int state)
358 {
359         MUTEX_LOCK(&agent->mutex);
360         agent->state |= state;
361         MUTEX_UNLOCK(&agent->mutex);
362 }
363
364 void unset_agent_state(lutf_agent_blk_t *agent, unsigned int state)
365 {
366         MUTEX_LOCK(&agent->mutex);
367         agent->state &= ~state;
368         MUTEX_UNLOCK(&agent->mutex);
369 }
370
371 char *agent_ip2str(lutf_agent_blk_t *agent)
372 {
373         if (!agent)
374                 return NULL;
375
376         return inet_ntoa(agent->addr.sin_addr);
377 }
378
379 int get_num_agents(void)
380 {
381         int i;
382         int num = 0;
383
384         for (i = 0; i < MAX_NUM_AGENTS; i++) {
385                 if (agent_live_list[i] != NULL)
386                         num++;
387         }
388
389         return num;
390 }
391
392 lutf_agent_blk_t *find_agent_blk_by_name(char *name)
393 {
394         lutf_agent_blk_t *agent;
395         int i;
396
397         if (!name)
398                 return NULL;
399
400         MUTEX_LOCK(&agent_array_mutex);
401
402         for (i = 0; i < MAX_NUM_AGENTS; i++) {
403                 agent = agent_live_list[i];
404                 if ((agent) && agent_alive(agent) &&
405                     ((strcmp(agent->name, name) == 0) ||
406                      (strcmp(name, TEST_ROLE_GRC) == 0))) {
407                         acquire_agent_blk(agent);
408                         break;
409                 } else {
410                         agent = NULL;
411                 }
412         }
413
414         MUTEX_UNLOCK(&agent_array_mutex);
415
416         /* return the agent blk */
417         return agent;
418 }
419
420 lutf_agent_blk_t *find_agent_blk_by_ip(char *ip)
421 {
422         lutf_agent_blk_t *agent;
423         int i;
424         struct sockaddr_in addr;
425
426         if (!ip)
427                 return NULL;
428
429         inet_aton(ip, &addr.sin_addr);
430
431         /* grab the array mutex */
432         MUTEX_LOCK(&agent_array_mutex);
433
434         for (i = 0; i < MAX_NUM_AGENTS; i++) {
435                 agent = agent_live_list[i];
436                 if ((agent) && agent_alive(agent) &&
437                     (agent->addr.sin_addr.s_addr ==
438                                 addr.sin_addr.s_addr))
439                         break;
440                 else
441                         agent = NULL;
442         }
443
444         if (agent)
445                 acquire_agent_blk(agent);
446
447         /* release the array mutex */
448         MUTEX_UNLOCK(&agent_array_mutex);
449
450         /* return the agent blk */
451         return agent;
452 }
453
454 lutf_rc_t wait_for_agents(struct cYAML *agents, int timeout)
455 {
456         struct timeval start;
457         struct timeval now;
458         bool found = false;
459         lutf_agent_blk_t *agent;
460
461         gettimeofday(&start, NULL);
462         gettimeofday(&now, NULL);
463
464         if (!agents) {
465                 PDEBUG("No agent to wait for");
466                 return EN_LUTF_RC_OK;
467         }
468
469         PDEBUG("Start waiting for Agents");
470
471         while (now.tv_sec - start.tv_sec < timeout && !found) {
472                 struct cYAML *a = NULL;
473
474                 found = true;
475                 PDEBUG("Waiting for Agents");
476                 while (cYAML_get_next_seq_item(agents, &a) != NULL) {
477                         PDEBUG("Looking up: %s", a->cy_valuestring);
478                         agent = find_agent_blk_by_name(a->cy_valuestring);
479                         if (agent) {
480                                 PDEBUG("agent %s found\n", agent->name);
481                                 release_agent_blk(agent, false);
482                         } else {
483                                 found = false;
484                                 break;
485                         }
486                 }
487                 if (!found)
488                         sleep(1);
489                 gettimeofday(&now, NULL);
490         }
491
492         return found ? EN_LUTF_RC_OK : EN_LUTF_RC_TIMEOUT;
493 }
494
495 int get_num_agents_remote(char *masterIP, int masterPort)
496 {
497         lutf_rc_t rc;
498         lutf_msg_num_agents_query_t msg;
499         lutf_msg_num_agents_query_t *msg_p;
500         lutf_message_hdr_t hdr;
501         lutf_message_hdr_t *hdr_p;
502         int remoteSocket = INVALID_TCP_SOCKET;
503         struct in_addr addr;
504         char *recvBuf = calloc(1, sizeof(hdr) + sizeof(hdr));
505
506         if (!recvBuf) {
507                 PERROR("out of memory");
508                 rc = EN_LUTF_RC_FAIL;
509                 goto out;
510         }
511
512         if (!inet_aton(masterIP, &addr)) {
513                 PERROR("bad master IP = %s", masterIP);
514                 rc = EN_LUTF_RC_FAIL;
515                 goto out;
516         }
517
518         /* in network byte order, convert so we can have a
519          * uniform API
520          */
521         remoteSocket = establishTCPConnection(addr.s_addr,
522                                                 htons(masterPort),
523                                                 false, false);
524         if (remoteSocket < 0) {
525                 PERROR("establishTCPConnection failure: %s",
526                        lutf_rc2str(remoteSocket));
527                 rc = remoteSocket;
528                 goto out;
529         }
530
531         rc = lutf_send_msg(remoteSocket, NULL, 0, EN_MSG_TYPE_GET_NUM_AGENTS);
532         if (rc)
533                 goto out;
534
535         rc = readTcpMessage(remoteSocket, recvBuf, sizeof(hdr) + sizeof(msg),
536                             TCP_READ_TIMEOUT_SEC);
537         if (rc) {
538                 PERROR("failed to receive response");
539                 goto out;
540         }
541
542         hdr_p = (lutf_message_hdr_t *)recvBuf;
543         msg_p = (lutf_msg_num_agents_query_t *)(recvBuf + sizeof(hdr));
544
545         if (hdr_p->type != EN_MSG_TYPE_GET_NUM_AGENTS) {
546                 PERROR("Unexpected message. Waiting for num agents received %d",
547                        hdr_p->type);
548                 rc = EN_LUTF_RC_FAIL;
549                 goto out;
550         }
551
552         rc = msg_p->num_agents;
553
554 out:
555         closeTcpConnection(remoteSocket);
556         free(recvBuf);
557         return rc;
558 }
559
560 lutf_rc_t lutf_send_rpc(char *agent, char *yaml, int timeout, char **rsp)
561 {
562         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
563         lutf_agent_blk_t *agent_blk = NULL;
564         char *default_rsp;
565         lutf_message_hdr_t hdr;
566         char *recvBuf = NULL;
567         int msg_size;
568
569         if (!agent || !yaml || !rsp)
570                 goto fail_rpc;
571
572         msg_size = strlen(yaml) + 1;
573
574         PDEBUG("sending rpc request\n%s", yaml);
575
576         agent_blk = find_agent_blk_by_name(agent);
577         if (!agent_blk) {
578                 PERROR("Can't find agent with name: %s", agent);
579                 goto fail_rpc_no_agent;
580         }
581
582         MUTEX_LOCK(&agent_blk->mutex);
583         if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
584                 MUTEX_UNLOCK(&agent_blk->mutex);
585                 PDEBUG("Establishing an RPC channel to agent %s:%s:%d",
586                        agent_blk->name,
587                        inet_ntoa(agent_blk->addr.sin_addr),
588                        agent_blk->listen_port);
589                 /* in network byte order, convert so we can have a
590                  * uniform API
591                  */
592                 agent_blk->iRpcFd = establishTCPConnection(
593                                 agent_blk->addr.sin_addr.s_addr,
594                                 htons(agent_blk->listen_port),
595                                 false, false);
596                 if (agent_blk->iRpcFd < 0)
597                         goto fail_rpc;
598                 set_agent_state(agent_blk,
599                                 LUTF_AGENT_RPC_CHANNEL_CONNECTED);
600         } else {
601                 MUTEX_UNLOCK(&agent_blk->mutex);
602         }
603
604         set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
605
606         rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
607                            EN_MSG_TYPE_RPC_REQUEST);
608         if (rc != EN_LUTF_RC_OK) {
609                 PERROR("Failed to send rpc message: %s", yaml);
610                 goto fail_rpc;
611         }
612
613         /* wait for the response */
614         rc = readTcpMessage(agent_blk->iRpcFd, (char *)&hdr,
615                             sizeof(hdr), timeout);
616         if (rc != EN_LUTF_RC_OK) {
617                 PERROR("Failed to recv rpc header in timeout %d",
618                        timeout);
619                 goto fail_rpc;
620         }
621
622         if (ntohl(hdr.type) != EN_MSG_TYPE_RPC_RESPONSE ||
623             ntohl(hdr.version) != LUTF_VERSION_NUMBER) {
624                 PERROR("Bad response. version %d, type:%d\n",
625                        hdr.type, hdr.version);
626                 goto fail_rpc;
627         }
628
629         recvBuf = calloc(ntohl(hdr.len), 1);
630         if (!recvBuf) {
631                 PERROR("Failed to allocate buffer to recv rpc response");
632                 goto fail_rpc;
633         }
634
635         rc = readTcpMessage(agent_blk->iRpcFd, recvBuf, ntohl(hdr.len),
636                             timeout);
637         if (rc != EN_LUTF_RC_OK) {
638                 PERROR("Failed to recv rpc body in timeout %d", timeout);
639                 goto fail_rpc;
640         }
641
642         /*
643          * once recvBuf is given back to the caller, it's expected that
644          * the caller will manage the memory and free when done. This is
645          * mainly called from python. The SWIG wrapper frees the memory
646          * appropriately.
647          */
648         *rsp = recvBuf;
649         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
650         release_agent_blk(agent_blk, false);
651
652         return EN_LUTF_RC_OK;
653
654 fail_rpc:
655         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
656         set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
657         release_agent_blk(agent_blk, true);
658         if (recvBuf)
659                 free(recvBuf);
660         msg_size = strlen(DEFAULT_RPC_RSP)+strlen(agent_blk->name)+
661                 strlen(g_lutf_cfg.l_info.hb_info.node_name) + 1;
662 fail_rpc_no_agent:
663         default_rsp = calloc(msg_size, 1);
664         if (!default_rsp) {
665                 PERROR("Failed to allocate buffer for default response");
666                 *rsp = NULL;
667         } else {
668                 /* the source for the response would be the agent we sent
669                  * to and the destination is me
670                  */
671                 snprintf(default_rsp, msg_size,
672                          DEFAULT_RPC_RSP, agent,
673                          g_lutf_cfg.l_info.hb_info.node_name);
674                 *rsp = default_rsp;
675         }
676
677         return rc;
678 }
679
680 lutf_rc_t lutf_send_rpc_rsp(char *agent, char *yaml)
681 {
682         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
683         lutf_agent_blk_t *agent_blk;
684         int msg_size;
685         bool dead = false;
686
687         if (!agent || !yaml)
688                 goto out;
689
690         msg_size = strlen(yaml) + 1;
691
692         agent_blk = find_agent_blk_by_name(agent);
693         if (!agent_blk) {
694                 PERROR("Can't find agent with name: %s", agent);
695                 goto out;
696         }
697
698         MUTEX_LOCK(&agent_blk->mutex);
699         if (!(agent_blk->state & LUTF_AGENT_RPC_CHANNEL_CONNECTED)) {
700                 MUTEX_UNLOCK(&agent_blk->mutex);
701                 PERROR("agent_blk %s doesn't have an RPC channel",
702                        agent_blk->name);
703                 goto release_agent;
704         }
705         MUTEX_UNLOCK(&agent_blk->mutex);
706
707         set_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
708         PDEBUG("sending rpc response\n%s", yaml);
709         rc = lutf_send_msg(agent_blk->iRpcFd, yaml, msg_size,
710                            EN_MSG_TYPE_RPC_RESPONSE);
711         if (rc)
712                 dead = true;
713 release_agent:
714         unset_agent_state(agent_blk, LUTF_AGENT_WORK_IN_PROGRESS);
715         release_agent_blk(agent_blk, dead);
716         if (dead)
717                 set_agent_state(agent_blk, LUTF_AGENT_NEED_LISTEN_CLEAN);
718 out:
719         return rc;
720 }
721
722 void agent_init(void)
723 {
724         pthread_mutex_init(&agent_array_mutex, NULL);
725 }