Whamcloud - gitweb
943f332b44188e0d1b69f69e3a5f2c93587a2efb
[fs/lustre-release.git] / lnet / selftest / console.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/conctl.c
37  *
38  * Infrastructure of LST console
39  *
40  * Author: Liang Zhen <liangzhen@clusterfs.com>
41  */
42
43 #ifdef __KERNEL__
44
45 #include <libcfs/libcfs.h>
46 #include <lnet/lib-lnet.h>
47 #include "console.h"
48 #include "conrpc.h"
49
50 #define LST_NODE_STATE_COUNTER(nd, p)                   \
51 do {                                                    \
52         if ((nd)->nd_state == LST_NODE_ACTIVE)          \
53                 (p)->nle_nactive ++;                    \
54         else if ((nd)->nd_state == LST_NODE_BUSY)       \
55                 (p)->nle_nbusy ++;                      \
56         else if ((nd)->nd_state == LST_NODE_DOWN)       \
57                 (p)->nle_ndown ++;                      \
58         else                                            \
59                 (p)->nle_nunknown ++;                   \
60         (p)->nle_nnode ++;                              \
61 } while (0)
62
63 lstcon_session_t        console_session;
64
65 void
66 lstcon_node_get(lstcon_node_t *nd)
67 {
68         LASSERT (nd->nd_ref >= 1);
69
70         nd->nd_ref++;
71 }
72
73 static int
74 lstcon_node_find(lnet_process_id_t id, lstcon_node_t **ndpp, int create)
75 {
76         lstcon_ndlink_t *ndl;
77         unsigned int     idx = LNET_NIDADDR(id.nid) % LST_GLOBAL_HASHSIZE;
78
79         LASSERT (id.nid != LNET_NID_ANY);
80
81         list_for_each_entry(ndl, &console_session.ses_ndl_hash[idx], ndl_hlink) {
82                 if (ndl->ndl_node->nd_id.nid != id.nid ||
83                     ndl->ndl_node->nd_id.pid != id.pid)
84                         continue;
85
86                 lstcon_node_get(ndl->ndl_node);
87                 *ndpp = ndl->ndl_node;
88                 return 0;
89         }
90         
91         if (!create)
92                 return -ENOENT;
93
94         LIBCFS_ALLOC(*ndpp, sizeof(lstcon_node_t) + sizeof(lstcon_ndlink_t));
95         if (*ndpp == NULL)
96                 return -ENOMEM;
97
98         ndl = (lstcon_ndlink_t *)(*ndpp + 1);
99
100         ndl->ndl_node = *ndpp;
101
102         ndl->ndl_node->nd_ref   = 1;
103         ndl->ndl_node->nd_id    = id;
104         ndl->ndl_node->nd_stamp = cfs_time_current();
105         ndl->ndl_node->nd_state = LST_NODE_UNKNOWN;
106         ndl->ndl_node->nd_timeout = 0;
107         memset(&ndl->ndl_node->nd_ping, 0, sizeof(lstcon_rpc_t));
108
109         /* queued in global hash & list, no refcount is taken by
110          * global hash & list, if caller release his refcount,
111          * node will be released */
112         list_add_tail(&ndl->ndl_hlink, &console_session.ses_ndl_hash[idx]);
113         list_add_tail(&ndl->ndl_link, &console_session.ses_ndl_list);
114
115         return 0;
116 }
117
118 void
119 lstcon_node_put(lstcon_node_t *nd)
120 {
121         lstcon_ndlink_t  *ndl;
122
123         LASSERT (nd->nd_ref > 0);
124
125         if (--nd->nd_ref > 0)
126                 return;
127
128         ndl = (lstcon_ndlink_t *)(nd + 1);
129
130         LASSERT (!list_empty(&ndl->ndl_link));
131         LASSERT (!list_empty(&ndl->ndl_hlink));
132
133         /* remove from session */
134         list_del(&ndl->ndl_link);
135         list_del(&ndl->ndl_hlink);
136
137         LIBCFS_FREE(nd, sizeof(lstcon_node_t) + sizeof(lstcon_ndlink_t));
138 }
139
140 static int
141 lstcon_ndlink_find(struct list_head *hash,
142                    lnet_process_id_t id, lstcon_ndlink_t **ndlpp, int create)
143 {
144         unsigned int     idx = LNET_NIDADDR(id.nid) % LST_NODE_HASHSIZE;
145         lstcon_ndlink_t *ndl;
146         lstcon_node_t   *nd;
147         int              rc;
148
149         if (id.nid == LNET_NID_ANY)
150                 return -EINVAL;
151
152         /* search in hash */
153         list_for_each_entry(ndl, &hash[idx], ndl_hlink) {
154                 if (ndl->ndl_node->nd_id.nid != id.nid ||
155                     ndl->ndl_node->nd_id.pid != id.pid)
156                         continue;
157
158                 *ndlpp = ndl;
159                 return 0;
160         }
161
162         if (create == 0)
163                 return -ENOENT;
164
165         /* find or create in session hash */
166         rc = lstcon_node_find(id, &nd, (create == 1) ? 1 : 0);
167         if (rc != 0)
168                 return rc;
169
170         LIBCFS_ALLOC(ndl, sizeof(lstcon_ndlink_t));
171         if (ndl == NULL) {
172                 lstcon_node_put(nd);
173                 return -ENOMEM;
174         }
175         
176         *ndlpp = ndl;
177
178         ndl->ndl_node = nd;
179         CFS_INIT_LIST_HEAD(&ndl->ndl_link);
180         list_add_tail(&ndl->ndl_hlink, &hash[idx]);
181
182         return  0;
183 }
184
185 static void
186 lstcon_ndlink_release(lstcon_ndlink_t *ndl)
187 {
188         LASSERT (list_empty(&ndl->ndl_link));
189         LASSERT (!list_empty(&ndl->ndl_hlink));
190
191         list_del(&ndl->ndl_hlink); /* delete from hash */
192         lstcon_node_put(ndl->ndl_node);
193
194         LIBCFS_FREE(ndl, sizeof(*ndl));
195 }
196
197 static int
198 lstcon_group_alloc(char *name, lstcon_group_t **grpp)
199 {
200         lstcon_group_t *grp;
201         int             i;
202
203         LIBCFS_ALLOC(grp, offsetof(lstcon_group_t,
204                                    grp_ndl_hash[LST_NODE_HASHSIZE]));
205         if (grp == NULL)
206                 return -ENOMEM;
207
208         memset(grp, 0, offsetof(lstcon_group_t,
209                                 grp_ndl_hash[LST_NODE_HASHSIZE]));
210
211         grp->grp_ref = 1;
212         if (name != NULL)
213                 strcpy(grp->grp_name, name);
214
215         CFS_INIT_LIST_HEAD(&grp->grp_link);
216         CFS_INIT_LIST_HEAD(&grp->grp_ndl_list);
217         CFS_INIT_LIST_HEAD(&grp->grp_trans_list);
218
219         for (i = 0; i < LST_NODE_HASHSIZE; i++)
220                 CFS_INIT_LIST_HEAD(&grp->grp_ndl_hash[i]);
221
222         *grpp = grp;
223
224         return 0;
225 }
226
227 static void
228 lstcon_group_addref(lstcon_group_t *grp)
229 {
230         grp->grp_ref ++;
231 }
232
233 static void lstcon_group_ndlink_release(lstcon_group_t *, lstcon_ndlink_t *);
234
235 static void
236 lstcon_group_drain(lstcon_group_t *grp, int keep)
237 {
238         lstcon_ndlink_t *ndl;
239         lstcon_ndlink_t *tmp;
240
241         list_for_each_entry_safe(ndl, tmp, &grp->grp_ndl_list, ndl_link) {
242                 if ((ndl->ndl_node->nd_state & keep) == 0)
243                         lstcon_group_ndlink_release(grp, ndl);
244         }
245 }
246
247 static void
248 lstcon_group_decref(lstcon_group_t *grp)
249 {
250         int     i;
251
252         if (--grp->grp_ref > 0)
253                 return;
254
255         if (!list_empty(&grp->grp_link))
256                 list_del(&grp->grp_link);
257
258         lstcon_group_drain(grp, 0);
259
260         for (i = 0; i < LST_NODE_HASHSIZE; i++) {
261                 LASSERT (list_empty(&grp->grp_ndl_hash[i]));
262         }
263
264         LIBCFS_FREE(grp, offsetof(lstcon_group_t,
265                                   grp_ndl_hash[LST_NODE_HASHSIZE]));
266 }
267
268 static int
269 lstcon_group_find(char *name, lstcon_group_t **grpp)
270 {
271         lstcon_group_t   *grp;
272
273         list_for_each_entry(grp, &console_session.ses_grp_list, grp_link) {
274                 if (strncmp(grp->grp_name, name, LST_NAME_SIZE) != 0)
275                         continue;
276
277                 lstcon_group_addref(grp);  /* +1 ref for caller */
278                 *grpp = grp;
279                 return 0;
280         }
281
282         return -ENOENT;
283 }
284
285 static void
286 lstcon_group_put(lstcon_group_t *grp)
287 {
288         lstcon_group_decref(grp);
289 }
290
291 static int
292 lstcon_group_ndlink_find(lstcon_group_t *grp, lnet_process_id_t id,
293                          lstcon_ndlink_t **ndlpp, int create)
294 {
295         int     rc;
296         
297         rc = lstcon_ndlink_find(&grp->grp_ndl_hash[0], id, ndlpp, create);
298         if (rc != 0)
299                 return rc;
300
301         if (!list_empty(&(*ndlpp)->ndl_link))
302                 return 0;
303
304         list_add_tail(&(*ndlpp)->ndl_link, &grp->grp_ndl_list);
305         grp->grp_nnode ++;
306
307         return 0;
308 }
309
310 static void
311 lstcon_group_ndlink_release(lstcon_group_t *grp, lstcon_ndlink_t *ndl)
312 {
313         list_del_init(&ndl->ndl_link);
314         lstcon_ndlink_release(ndl);
315         grp->grp_nnode --;
316 }
317
318 static void
319 lstcon_group_ndlink_move(lstcon_group_t *old,
320                          lstcon_group_t *new, lstcon_ndlink_t *ndl)
321 {
322         unsigned int idx = LNET_NIDADDR(ndl->ndl_node->nd_id.nid) %
323                            LST_NODE_HASHSIZE;
324
325         list_del(&ndl->ndl_hlink);
326         list_del(&ndl->ndl_link);
327         old->grp_nnode --;
328
329         list_add_tail(&ndl->ndl_hlink, &new->grp_ndl_hash[idx]);
330         list_add_tail(&ndl->ndl_link, &new->grp_ndl_list);
331         new->grp_nnode ++;
332
333         return;
334 }
335
336 static void
337 lstcon_group_move(lstcon_group_t *old, lstcon_group_t *new)
338 {
339         lstcon_ndlink_t *ndl;
340
341         while (!list_empty(&old->grp_ndl_list)) {
342                 ndl = list_entry(old->grp_ndl_list.next,
343                                  lstcon_ndlink_t, ndl_link);
344                 lstcon_group_ndlink_move(old, new, ndl);
345         }
346 }
347
348 int
349 lstcon_sesrpc_condition(int transop, lstcon_node_t *nd, void *arg)
350 {
351         lstcon_group_t *grp = (lstcon_group_t *)arg;
352
353         switch (transop) {
354         case LST_TRANS_SESNEW:
355                 if (nd->nd_state == LST_NODE_ACTIVE)
356                         return 0;
357                 break;
358
359         case LST_TRANS_SESEND:
360                 if (nd->nd_state != LST_NODE_ACTIVE)
361                         return 0;
362
363                 if (grp != NULL && nd->nd_ref > 1)
364                         return 0;
365                 break;
366
367         case LST_TRANS_SESQRY:
368                 break;
369
370         default:
371                 LBUG();
372         }
373
374         return 1;
375 }
376
377 int
378 lstcon_sesrpc_readent(int transop, srpc_msg_t *msg,
379                       lstcon_rpc_ent_t *ent_up)
380 {
381         srpc_debug_reply_t *rep;
382
383         switch (transop) {
384         case LST_TRANS_SESNEW:
385         case LST_TRANS_SESEND:
386                 return 0;
387
388         case LST_TRANS_SESQRY:
389                 rep = &msg->msg_body.dbg_reply;
390
391                 if (copy_to_user(&ent_up->rpe_priv[0],
392                                  &rep->dbg_timeout, sizeof(int)) ||
393                     copy_to_user(&ent_up->rpe_payload[0],
394                                  &rep->dbg_name, LST_NAME_SIZE))
395                         return -EFAULT;
396
397                 return 0;
398
399         default:
400                 LBUG();
401         }
402
403         return 0;
404 }
405
406 static int
407 lstcon_group_nodes_add(lstcon_group_t *grp, int count,
408                        lnet_process_id_t *ids_up, struct list_head *result_up)
409 {
410         lstcon_rpc_trans_t      *trans;
411         lstcon_ndlink_t         *ndl;
412         lstcon_group_t          *tmp;
413         lnet_process_id_t        id;
414         int                      i;
415         int                      rc;
416
417         rc = lstcon_group_alloc(NULL, &tmp);
418         if (rc != 0) {
419                 CERROR("Out of memory\n");
420                 return -ENOMEM;
421         }
422
423         for (i = 0 ; i < count; i++) {
424                 if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
425                         rc = -EFAULT;
426                         break;
427                 }
428
429                 /* skip if it's in this group already */
430                 rc = lstcon_group_ndlink_find(grp, id, &ndl, 0);
431                 if (rc == 0)
432                         continue;
433
434                 /* add to tmp group */
435                 rc = lstcon_group_ndlink_find(tmp, id, &ndl, 1);
436                 if (rc != 0) {
437                         CERROR("Can't create ndlink, out of memory\n");
438                         break;
439                 }
440         }
441
442         if (rc != 0) {
443                 lstcon_group_put(tmp);
444                 return rc;
445         }
446
447         rc = lstcon_rpc_trans_ndlist(&tmp->grp_ndl_list,
448                                      &tmp->grp_trans_list, LST_TRANS_SESNEW,
449                                      tmp, lstcon_sesrpc_condition, &trans);
450         if (rc != 0) {
451                 CERROR("Can't create transaction: %d\n", rc);
452                 lstcon_group_put(tmp);
453                 return rc;
454         }
455
456         /* post all RPCs */
457         lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);
458         
459         rc = lstcon_rpc_trans_interpreter(trans, result_up,
460                                           lstcon_sesrpc_readent);
461         /* destroy all RPGs */
462         lstcon_rpc_trans_destroy(trans);
463
464         lstcon_group_move(tmp, grp);
465         lstcon_group_put(tmp);
466
467         return rc;
468 }
469
470 static int
471 lstcon_group_nodes_remove(lstcon_group_t *grp,
472                           int count, lnet_process_id_t *ids_up,
473                           struct list_head *result_up)
474 {
475         lstcon_rpc_trans_t     *trans;
476         lstcon_ndlink_t        *ndl;
477         lstcon_group_t         *tmp;
478         lnet_process_id_t       id;
479         int                     rc;
480         int                     i;
481
482         /* End session and remove node from the group */
483
484         rc = lstcon_group_alloc(NULL, &tmp);
485         if (rc != 0) {
486                 CERROR("Out of memory\n");
487                 return -ENOMEM;
488         }
489
490         for (i = 0; i < count; i++) {
491                 if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
492                         rc = -EFAULT;
493                         goto error;
494                 }
495                 
496                 /* move node to tmp group */
497                 if (lstcon_group_ndlink_find(grp, id, &ndl, 0) == 0)
498                         lstcon_group_ndlink_move(grp, tmp, ndl);
499         }
500
501         rc = lstcon_rpc_trans_ndlist(&tmp->grp_ndl_list,
502                                      &tmp->grp_trans_list, LST_TRANS_SESEND,
503                                      tmp, lstcon_sesrpc_condition, &trans);
504         if (rc != 0) {
505                 CERROR("Can't create transaction: %d\n", rc);
506                 goto error;
507         }
508
509         lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);
510
511         rc = lstcon_rpc_trans_interpreter(trans, result_up, NULL);
512
513         lstcon_rpc_trans_destroy(trans);
514         /* release nodes anyway, because we can't rollback status */
515         lstcon_group_put(tmp);
516
517         return rc;
518 error:
519         lstcon_group_move(tmp, grp);
520         lstcon_group_put(tmp);
521
522         return rc;
523 }
524
525 int
526 lstcon_group_add(char *name)
527 {
528         lstcon_group_t *grp;
529         int             rc;
530
531         rc = (lstcon_group_find(name, &grp) == 0)? -EEXIST: 0;
532         if (rc != 0) {
533                 /* find a group with same name */
534                 lstcon_group_put(grp);
535                 return rc;
536         }
537
538         rc = lstcon_group_alloc(name, &grp);
539         if (rc != 0) {
540                 CERROR("Can't allocate descriptor for group %s\n", name);
541                 return -ENOMEM;
542         }
543
544         list_add_tail(&grp->grp_link, &console_session.ses_grp_list);
545
546         return rc;
547 }
548
549 int
550 lstcon_nodes_add(char *name, int count,
551                  lnet_process_id_t *ids_up, struct list_head *result_up)
552 {
553         lstcon_group_t         *grp;
554         int                     rc;
555
556         LASSERT (count > 0);
557         LASSERT (ids_up != NULL);
558
559         rc = lstcon_group_find(name, &grp);
560         if (rc != 0) {
561                 CDEBUG(D_NET, "Can't find group %s\n", name);
562                 return rc;
563         }
564
565         if (grp->grp_ref > 2) {
566                 /* referred by other threads or test */
567                 CDEBUG(D_NET, "Group %s is busy\n", name);
568                 lstcon_group_put(grp);
569
570                 return -EBUSY;
571         }
572
573         rc = lstcon_group_nodes_add(grp, count, ids_up, result_up);
574
575         lstcon_group_put(grp);
576
577         return rc;
578 }
579
580 int
581 lstcon_group_del(char *name)
582 {
583         lstcon_rpc_trans_t *trans;
584         lstcon_group_t     *grp;
585         int                 rc;
586
587         rc = lstcon_group_find(name, &grp);
588         if (rc != 0) {
589                 CDEBUG(D_NET, "Can't find group: %s\n", name);
590                 return rc;
591         }
592
593         if (grp->grp_ref > 2) {
594                 /* referred by others threads or test */
595                 CDEBUG(D_NET, "Group %s is busy\n", name);
596                 lstcon_group_put(grp);
597                 return -EBUSY;
598         }
599
600         rc = lstcon_rpc_trans_ndlist(&grp->grp_ndl_list,
601                                      &grp->grp_trans_list, LST_TRANS_SESEND,
602                                      grp, lstcon_sesrpc_condition, &trans);
603         if (rc != 0) {
604                 CERROR("Can't create transaction: %d\n", rc);
605                 lstcon_group_put(grp);
606                 return rc;
607         }
608
609         lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);
610
611         lstcon_rpc_trans_destroy(trans);
612
613         lstcon_group_put(grp);
614         /* -ref for session, it's destroyed,
615          * status can't be rolled back, destroy group anway */
616         lstcon_group_put(grp);
617
618         return rc;
619 }
620
621 int
622 lstcon_group_clean(char *name, int args)
623 {
624         lstcon_group_t *grp = NULL;
625         int             rc;
626
627         rc = lstcon_group_find(name, &grp);
628         if (rc != 0) {
629                 CDEBUG(D_NET, "Can't find group %s\n", name);
630                 return rc;
631         }
632
633         if (grp->grp_ref > 2) {
634                 /* referred by test */
635                 CDEBUG(D_NET, "Group %s is busy\n", name);
636                 lstcon_group_put(grp);
637                 return -EBUSY;
638         }
639
640         args = (LST_NODE_ACTIVE | LST_NODE_BUSY |
641                 LST_NODE_DOWN | LST_NODE_UNKNOWN) & ~args;
642
643         lstcon_group_drain(grp, args);
644
645         lstcon_group_put(grp);
646         /* release empty group */
647         if (list_empty(&grp->grp_ndl_list))
648                 lstcon_group_put(grp);
649
650         return 0;
651 }
652
653 int
654 lstcon_nodes_remove(char *name, int count,
655                     lnet_process_id_t *ids_up, struct list_head *result_up)
656 {
657         lstcon_group_t *grp = NULL;
658         int             rc;
659
660         rc = lstcon_group_find(name, &grp);
661         if (rc != 0) {
662                 CDEBUG(D_NET, "Can't find group: %s\n", name);
663                 return rc;
664         }
665
666         if (grp->grp_ref > 2) {
667                 /* referred by test */
668                 CDEBUG(D_NET, "Group %s is busy\n", name);
669                 lstcon_group_put(grp);
670                 return -EBUSY;
671         }
672
673         rc = lstcon_group_nodes_remove(grp, count, ids_up, result_up);
674
675         lstcon_group_put(grp);
676         /* release empty group */
677         if (list_empty(&grp->grp_ndl_list))
678                 lstcon_group_put(grp);
679
680         return rc;
681 }
682
683 int
684 lstcon_group_refresh(char *name, struct list_head *result_up)
685 {
686         lstcon_rpc_trans_t      *trans;
687         lstcon_group_t          *grp;
688         int                      rc;
689
690         rc = lstcon_group_find(name, &grp);
691         if (rc != 0) {
692                 CDEBUG(D_NET, "Can't find group: %s\n", name);
693                 return rc;
694         }
695
696         if (grp->grp_ref > 2) {
697                 /* referred by test */
698                 CDEBUG(D_NET, "Group %s is busy\n", name);
699                 lstcon_group_put(grp);
700                 return -EBUSY;
701         }
702
703         /* re-invite all inactive nodes int the group */
704         rc = lstcon_rpc_trans_ndlist(&grp->grp_ndl_list,
705                                      &grp->grp_trans_list, LST_TRANS_SESNEW,
706                                      grp, lstcon_sesrpc_condition, &trans);
707         if (rc != 0) {
708                 /* local error, return */
709                 CDEBUG(D_NET, "Can't create transaction: %d\n", rc);
710                 lstcon_group_put(grp);
711                 return rc;
712         }
713
714         lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);
715
716         rc = lstcon_rpc_trans_interpreter(trans, result_up, NULL);
717
718         lstcon_rpc_trans_destroy(trans);
719         /* -ref for me */
720         lstcon_group_put(grp);
721
722         return rc;
723 }
724
725 int
726 lstcon_group_list(int index, int len, char *name_up)
727 {
728         lstcon_group_t *grp;
729
730         LASSERT (index >= 0);
731         LASSERT (name_up != NULL);
732
733         list_for_each_entry(grp, &console_session.ses_grp_list, grp_link) {
734                 if (index-- == 0) {
735                         return copy_to_user(name_up, grp->grp_name, len) ?
736                                -EFAULT : 0;
737                 }
738         }
739
740         return -ENOENT;
741 }
742
743 static int
744 lstcon_nodes_getent(struct list_head *head, int *index_p,
745                     int *count_p, lstcon_node_ent_t *dents_up)
746 {
747         lstcon_ndlink_t  *ndl;
748         lstcon_node_t    *nd;
749         int               count = 0;
750         int               index = 0;
751
752         LASSERT (index_p != NULL && count_p != NULL);
753         LASSERT (dents_up != NULL);
754         LASSERT (*index_p >= 0);
755         LASSERT (*count_p > 0);
756
757         list_for_each_entry(ndl, head, ndl_link) {
758                 if (index++ < *index_p)
759                         continue;
760
761                 if (count >= *count_p)
762                         break;
763
764                 nd = ndl->ndl_node;
765                 if (copy_to_user(&dents_up[count].nde_id,
766                                  &nd->nd_id, sizeof(nd->nd_id)) ||
767                     copy_to_user(&dents_up[count].nde_state,
768                                  &nd->nd_state, sizeof(nd->nd_state)))
769                         return -EFAULT;
770
771                 count ++;
772         }
773
774         if (index <= *index_p)
775                 return -ENOENT;
776
777         *count_p = count;
778         *index_p = index;
779
780         return 0;
781 }
782
783 int
784 lstcon_group_info(char *name, lstcon_ndlist_ent_t *gents_p,
785                   int *index_p, int *count_p, lstcon_node_ent_t *dents_up)
786 {
787         lstcon_ndlist_ent_t *gentp;
788         lstcon_group_t      *grp;
789         lstcon_ndlink_t     *ndl;
790         int                  rc;
791
792         rc = lstcon_group_find(name, &grp);
793         if (rc != 0) {
794                 CDEBUG(D_NET, "Can't find group %s\n", name);
795                 return rc;
796         }
797
798         if (dents_up != 0) {
799                 /* verbose query */
800                 rc = lstcon_nodes_getent(&grp->grp_ndl_list,
801                                          index_p, count_p, dents_up);
802                 lstcon_group_put(grp);
803
804                 return rc;
805         }
806
807         /* non-verbose query */
808         LIBCFS_ALLOC(gentp, sizeof(lstcon_ndlist_ent_t));
809         if (gentp == NULL) {
810                 CERROR("Can't allocate ndlist_ent\n");
811                 lstcon_group_put(grp);
812
813                 return -ENOMEM;
814         }
815
816         memset(gentp, 0, sizeof(lstcon_ndlist_ent_t));
817
818         list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link)
819                 LST_NODE_STATE_COUNTER(ndl->ndl_node, gentp);
820
821         rc = copy_to_user(gents_p, gentp,
822                           sizeof(lstcon_ndlist_ent_t)) ? -EFAULT: 0;
823
824         LIBCFS_FREE(gentp, sizeof(lstcon_ndlist_ent_t));
825                 
826         lstcon_group_put(grp);
827
828         return 0;
829 }
830
831 int
832 lstcon_batch_find(char *name, lstcon_batch_t **batpp)
833 {
834         lstcon_batch_t   *bat;
835
836         list_for_each_entry(bat, &console_session.ses_bat_list, bat_link) {
837                 if (strncmp(bat->bat_name, name, LST_NAME_SIZE) == 0) {
838                         *batpp = bat;
839                         return 0;
840                 }
841         }
842
843         return -ENOENT;
844 }
845
846 int
847 lstcon_batch_add(char *name)
848 {
849         lstcon_batch_t   *bat;
850         int               i;
851         int               rc;
852
853         rc = (lstcon_batch_find(name, &bat) == 0)? -EEXIST: 0;
854         if (rc != 0) {
855                 CDEBUG(D_NET, "Batch %s already exists\n", name);
856                 return rc;
857         }
858
859         LIBCFS_ALLOC(bat, sizeof(lstcon_batch_t));
860         if (bat == NULL) {
861                 CERROR("Can't allocate descriptor for batch %s\n", name);
862                 return -ENOMEM;
863         }
864
865         LIBCFS_ALLOC(bat->bat_cli_hash,
866                      sizeof(struct list_head) * LST_NODE_HASHSIZE);
867         if (bat->bat_cli_hash == NULL) {
868                 CERROR("Can't allocate hash for batch %s\n", name);
869                 LIBCFS_FREE(bat, sizeof(lstcon_batch_t));
870
871                 return -ENOMEM;
872         }
873
874         LIBCFS_ALLOC(bat->bat_srv_hash,
875                      sizeof(struct list_head) * LST_NODE_HASHSIZE);
876         if (bat->bat_srv_hash == NULL) {
877                 CERROR("Can't allocate hash for batch %s\n", name);
878                 LIBCFS_FREE(bat->bat_cli_hash, LST_NODE_HASHSIZE);
879                 LIBCFS_FREE(bat, sizeof(lstcon_batch_t));
880
881                 return -ENOMEM;
882         }
883
884         strcpy(bat->bat_name, name);
885         bat->bat_hdr.tsb_index = 0;
886         bat->bat_hdr.tsb_id.bat_id = ++console_session.ses_id_cookie;
887
888         bat->bat_ntest = 0;
889         bat->bat_state = LST_BATCH_IDLE;
890
891         CFS_INIT_LIST_HEAD(&bat->bat_cli_list);
892         CFS_INIT_LIST_HEAD(&bat->bat_srv_list);
893         CFS_INIT_LIST_HEAD(&bat->bat_test_list);
894         CFS_INIT_LIST_HEAD(&bat->bat_trans_list);
895
896         for (i = 0; i < LST_NODE_HASHSIZE; i++) {
897                 CFS_INIT_LIST_HEAD(&bat->bat_cli_hash[i]);
898                 CFS_INIT_LIST_HEAD(&bat->bat_srv_hash[i]);
899         }
900
901         list_add_tail(&bat->bat_link, &console_session.ses_bat_list);
902
903         return rc;
904 }
905
906 int
907 lstcon_batch_list(int index, int len, char *name_up)
908 {
909         lstcon_batch_t    *bat;
910
911         LASSERT (name_up != NULL);
912         LASSERT (index >= 0);
913
914         list_for_each_entry(bat, &console_session.ses_bat_list, bat_link) {
915                 if (index-- == 0) {
916                         return copy_to_user(name_up,bat->bat_name, len) ?
917                                -EFAULT: 0;
918                 }
919         }
920
921         return -ENOENT;
922 }
923
924 int
925 lstcon_batch_info(char *name, lstcon_test_batch_ent_t *ent_up, int server,
926                   int testidx, int *index_p, int *ndent_p,
927                   lstcon_node_ent_t *dents_up)
928 {
929         lstcon_test_batch_ent_t *entp;
930         struct list_head        *clilst;
931         struct list_head        *srvlst;
932         lstcon_test_t           *test = NULL;
933         lstcon_batch_t          *bat;
934         lstcon_ndlink_t         *ndl;
935         int                      rc;
936
937         rc = lstcon_batch_find(name, &bat);
938         if (rc != 0) {
939                 CDEBUG(D_NET, "Can't find batch %s\n", name);
940                 return -ENOENT;
941         }
942
943         if (testidx > 0) {
944                 /* query test, test index start from 1 */
945                 list_for_each_entry(test, &bat->bat_test_list, tes_link) {
946                         if (testidx-- == 1)
947                                 break;
948                 }
949
950                 if (testidx > 0) {
951                         CDEBUG(D_NET, "Can't find specified test in batch\n");
952                         return -ENOENT;
953                 }
954         }
955
956         clilst = (test == NULL) ? &bat->bat_cli_list :
957                                   &test->tes_src_grp->grp_ndl_list;
958         srvlst = (test == NULL) ? &bat->bat_srv_list :
959                                   &test->tes_dst_grp->grp_ndl_list;
960
961         if (dents_up != NULL) {
962                 rc = lstcon_nodes_getent((server ? srvlst: clilst),
963                                          index_p, ndent_p, dents_up);
964                 return rc;
965         }
966
967         /* non-verbose query */
968         LIBCFS_ALLOC(entp, sizeof(lstcon_test_batch_ent_t));
969         if (entp == NULL)
970                 return -ENOMEM;
971
972         memset(entp, 0, sizeof(lstcon_test_batch_ent_t));
973
974         if (test == NULL) {
975                 entp->u.tbe_batch.bae_ntest = bat->bat_ntest;
976                 entp->u.tbe_batch.bae_state = bat->bat_state;
977
978         } else {
979
980                 entp->u.tbe_test.tse_type   = test->tes_type;
981                 entp->u.tbe_test.tse_loop   = test->tes_loop;
982                 entp->u.tbe_test.tse_concur = test->tes_concur;
983         }
984
985         list_for_each_entry(ndl, clilst, ndl_link)
986                 LST_NODE_STATE_COUNTER(ndl->ndl_node, &entp->tbe_cli_nle);
987
988         list_for_each_entry(ndl, srvlst, ndl_link)
989                 LST_NODE_STATE_COUNTER(ndl->ndl_node, &entp->tbe_srv_nle);
990
991         rc = copy_to_user(ent_up, entp,
992                           sizeof(lstcon_test_batch_ent_t)) ? -EFAULT : 0;
993
994         LIBCFS_FREE(entp, sizeof(lstcon_test_batch_ent_t));
995
996         return rc;
997 }
998
999 int
1000 lstcon_batrpc_condition(int transop, lstcon_node_t *nd, void *arg)
1001 {
1002         switch (transop) {
1003         case LST_TRANS_TSBRUN:
1004                 if (nd->nd_state != LST_NODE_ACTIVE)
1005                         return -ENETDOWN;
1006                 break;
1007
1008         case LST_TRANS_TSBSTOP:
1009                 if (nd->nd_state != LST_NODE_ACTIVE)
1010                         return 0;
1011                 break;
1012
1013         case LST_TRANS_TSBCLIQRY:
1014         case LST_TRANS_TSBSRVQRY:
1015                 break;
1016         }
1017
1018         return 1;
1019 }
1020
1021 static int
1022 lstcon_batch_op(lstcon_batch_t *bat, int transop, struct list_head *result_up)
1023 {
1024         lstcon_rpc_trans_t *trans;
1025         int                 rc;
1026
1027         rc = lstcon_rpc_trans_ndlist(&bat->bat_cli_list,
1028                                      &bat->bat_trans_list, transop,
1029                                      bat, lstcon_batrpc_condition, &trans);
1030         if (rc != 0) {
1031                 CERROR("Can't create transaction: %d\n", rc);
1032                 return rc;
1033         }
1034
1035         lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);
1036
1037         rc = lstcon_rpc_trans_interpreter(trans, result_up, NULL);
1038
1039         lstcon_rpc_trans_destroy(trans);
1040
1041         return rc;
1042 }
1043
1044 int
1045 lstcon_batch_run(char *name, int timeout, struct list_head *result_up)
1046 {
1047         lstcon_batch_t *bat;
1048         int             rc;
1049
1050         if (lstcon_batch_find(name, &bat) != 0) {
1051                 CDEBUG(D_NET, "Can't find batch %s\n", name);
1052                 return -ENOENT;
1053         }
1054
1055         bat->bat_arg = timeout;
1056
1057         rc = lstcon_batch_op(bat, LST_TRANS_TSBRUN, result_up);
1058
1059         /* mark batch as running if it's started in any node */
1060         if (lstcon_tsbop_stat_success(lstcon_trans_stat(), 0) != 0)
1061                 bat->bat_state = LST_BATCH_RUNNING;
1062
1063         return rc;
1064 }
1065
1066 int
1067 lstcon_batch_stop(char *name, int force, struct list_head *result_up)
1068 {
1069         lstcon_batch_t *bat;
1070         int             rc;
1071
1072         if (lstcon_batch_find(name, &bat) != 0) {
1073                 CDEBUG(D_NET, "Can't find batch %s\n", name);
1074                 return -ENOENT;
1075         }
1076
1077         bat->bat_arg = force;
1078
1079         rc = lstcon_batch_op(bat, LST_TRANS_TSBSTOP, result_up);
1080         
1081         /* mark batch as stopped if all RPCs finished */
1082         if (lstcon_tsbop_stat_failure(lstcon_trans_stat(), 0) == 0)
1083                 bat->bat_state = LST_BATCH_IDLE;
1084
1085         return rc;
1086 }
1087
1088 static void
1089 lstcon_batch_destroy(lstcon_batch_t *bat)
1090 {
1091         lstcon_ndlink_t    *ndl;
1092         lstcon_test_t      *test;
1093         int                 i;
1094
1095         list_del(&bat->bat_link);
1096
1097         while (!list_empty(&bat->bat_test_list)) {
1098                 test = list_entry(bat->bat_test_list.next,
1099                                   lstcon_test_t, tes_link);
1100                 LASSERT (list_empty(&test->tes_trans_list));
1101
1102                 list_del(&test->tes_link);
1103
1104                 lstcon_group_put(test->tes_src_grp);
1105                 lstcon_group_put(test->tes_dst_grp);
1106
1107                 LIBCFS_FREE(test, offsetof(lstcon_test_t,
1108                                            tes_param[test->tes_paramlen]));
1109         }
1110
1111         LASSERT (list_empty(&bat->bat_trans_list));
1112
1113         while (!list_empty(&bat->bat_cli_list)) {
1114                 ndl = list_entry(bat->bat_cli_list.next,
1115                                  lstcon_ndlink_t, ndl_link);
1116                 list_del_init(&ndl->ndl_link);
1117
1118                 lstcon_ndlink_release(ndl);
1119         }
1120
1121         while (!list_empty(&bat->bat_srv_list)) {
1122                 ndl = list_entry(bat->bat_srv_list.next,
1123                                  lstcon_ndlink_t, ndl_link);
1124                 list_del_init(&ndl->ndl_link);
1125
1126                 lstcon_ndlink_release(ndl);
1127         }
1128
1129         for (i = 0; i < LST_NODE_HASHSIZE; i++) {
1130                 LASSERT (list_empty(&bat->bat_cli_hash[i]));
1131                 LASSERT (list_empty(&bat->bat_srv_hash[i]));
1132         }
1133
1134         LIBCFS_FREE(bat->bat_cli_hash,
1135                     sizeof(struct list_head) * LST_NODE_HASHSIZE);
1136         LIBCFS_FREE(bat->bat_srv_hash,
1137                     sizeof(struct list_head) * LST_NODE_HASHSIZE);
1138         LIBCFS_FREE(bat, sizeof(lstcon_batch_t));
1139 }
1140
1141 int
1142 lstcon_testrpc_condition(int transop, lstcon_node_t *nd, void *arg)
1143 {
1144         lstcon_test_t    *test;
1145         lstcon_batch_t   *batch;
1146         lstcon_ndlink_t  *ndl;
1147         struct list_head *hash;
1148         struct list_head *head;
1149
1150         test = (lstcon_test_t *)arg;
1151         LASSERT (test != NULL);
1152
1153         batch = test->tes_batch;
1154         LASSERT (batch != NULL);
1155
1156         if (test->tes_oneside &&
1157             transop == LST_TRANS_TSBSRVADD)
1158                 return 0;
1159
1160         if (nd->nd_state != LST_NODE_ACTIVE)
1161                 return -ENETDOWN;
1162
1163         if (transop == LST_TRANS_TSBCLIADD) {
1164                 hash = batch->bat_cli_hash;
1165                 head = &batch->bat_cli_list;
1166         
1167         } else {
1168                 LASSERT (transop == LST_TRANS_TSBSRVADD);
1169
1170                 hash = batch->bat_srv_hash;
1171                 head = &batch->bat_srv_list;
1172         }
1173
1174         LASSERT (nd->nd_id.nid != LNET_NID_ANY);
1175
1176         if (lstcon_ndlink_find(hash, nd->nd_id, &ndl, 1) != 0)
1177                 return -ENOMEM;
1178
1179         if (list_empty(&ndl->ndl_link))
1180                 list_add_tail(&ndl->ndl_link, head);
1181
1182         return 1;
1183 }
1184
1185 static int
1186 lstcon_test_nodes_add(lstcon_test_t *test, struct list_head *result_up)
1187 {
1188         lstcon_rpc_trans_t     *trans;
1189         lstcon_group_t         *grp;
1190         int                     transop;
1191         int                     rc;
1192
1193         LASSERT (test->tes_src_grp != NULL);
1194         LASSERT (test->tes_dst_grp != NULL);
1195
1196         transop = LST_TRANS_TSBSRVADD;
1197         grp  = test->tes_dst_grp;
1198 again:
1199         rc = lstcon_rpc_trans_ndlist(&grp->grp_ndl_list,
1200                                      &test->tes_trans_list, transop,
1201                                      test, lstcon_testrpc_condition, &trans);
1202         if (rc != 0) {
1203                 CERROR("Can't create transaction: %d\n", rc);
1204                 return rc;
1205         }
1206
1207         lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);
1208
1209         if (lstcon_trans_stat()->trs_rpc_errno != 0 ||
1210             lstcon_trans_stat()->trs_fwk_errno != 0) {
1211                 lstcon_rpc_trans_interpreter(trans, result_up, NULL);
1212
1213                 lstcon_rpc_trans_destroy(trans);
1214                 /* return if any error */
1215                 CDEBUG(D_NET, "Failed to add test %s, "
1216                               "RPC error %d, framework error %d\n",
1217                        transop == LST_TRANS_TSBCLIADD ? "client" : "server",
1218                        lstcon_trans_stat()->trs_rpc_errno,
1219                        lstcon_trans_stat()->trs_fwk_errno);
1220
1221                 return rc;
1222         }
1223
1224         lstcon_rpc_trans_destroy(trans);
1225
1226         if (transop == LST_TRANS_TSBCLIADD)
1227                 return rc;
1228
1229         transop = LST_TRANS_TSBCLIADD;
1230         grp = test->tes_src_grp;
1231         test->tes_cliidx = 0;
1232
1233         /* requests to test clients */
1234         goto again;
1235 }
1236
1237 int
1238 lstcon_test_add(char *name, int type, int loop, int concur,
1239                 int dist, int span, char *src_name, char * dst_name,
1240                 void *param, int paramlen, int *retp, struct list_head *result_up)
1241                 
1242 {
1243         lstcon_group_t  *src_grp = NULL;
1244         lstcon_group_t  *dst_grp = NULL;
1245         lstcon_test_t   *test    = NULL;
1246         lstcon_batch_t  *batch;
1247         int              rc;
1248
1249         rc = lstcon_batch_find(name, &batch);
1250         if (rc != 0) {
1251                 CDEBUG(D_NET, "Can't find batch %s\n", name);
1252                 return rc;
1253         }
1254
1255         if (batch->bat_state != LST_BATCH_IDLE) {
1256                 CDEBUG(D_NET, "Can't change running batch %s\n", name);
1257                 return rc;
1258         }
1259         
1260         rc = lstcon_group_find(src_name, &src_grp);
1261         if (rc != 0) {
1262                 CDEBUG(D_NET, "Can't find group %s\n", src_name);
1263                 goto out;
1264         }
1265
1266         rc = lstcon_group_find(dst_name, &dst_grp);
1267         if (rc != 0) {
1268                 CDEBUG(D_NET, "Can't find group %s\n", dst_name);
1269                 goto out;
1270         }
1271
1272         if (dst_grp->grp_userland)
1273                 *retp = 1;
1274
1275         LIBCFS_ALLOC(test, offsetof(lstcon_test_t, tes_param[paramlen]));
1276         if (!test) {
1277                 CERROR("Can't allocate test descriptor\n");
1278                 rc = -ENOMEM;
1279
1280                 goto out;
1281         }
1282
1283         memset(test, 0, offsetof(lstcon_test_t, tes_param[paramlen]));
1284         test->tes_hdr.tsb_id    = batch->bat_hdr.tsb_id;
1285         test->tes_batch         = batch;
1286         test->tes_type          = type;
1287         test->tes_oneside       = 0; /* TODO */
1288         test->tes_loop          = loop;
1289         test->tes_concur        = concur;
1290         test->tes_stop_onerr    = 1; /* TODO */
1291         test->tes_span          = span;
1292         test->tes_dist          = dist;
1293         test->tes_cliidx        = 0; /* just used for creating RPC */
1294         test->tes_src_grp       = src_grp;
1295         test->tes_dst_grp       = dst_grp;
1296         CFS_INIT_LIST_HEAD(&test->tes_trans_list);
1297
1298         if (param != NULL) {
1299                 test->tes_paramlen = paramlen;
1300                 memcpy(&test->tes_param[0], param, paramlen);
1301         }
1302
1303         rc = lstcon_test_nodes_add(test, result_up);
1304
1305         if (rc != 0)
1306                 goto out;
1307
1308         if (lstcon_trans_stat()->trs_rpc_errno != 0 ||
1309             lstcon_trans_stat()->trs_fwk_errno != 0)
1310                 CDEBUG(D_NET, "Failed to add test %d to batch %s\n", type, name);
1311
1312         /* add to test list anyway, so user can check what's going on */
1313         list_add_tail(&test->tes_link, &batch->bat_test_list);
1314
1315         batch->bat_ntest ++;
1316         test->tes_hdr.tsb_index = batch->bat_ntest;
1317
1318         /*  hold groups so nobody can change them */
1319         return rc;
1320 out:
1321         if (test != NULL)
1322                 LIBCFS_FREE(test, offsetof(lstcon_test_t, tes_param[paramlen]));
1323
1324         if (dst_grp != NULL)
1325                 lstcon_group_put(dst_grp);
1326
1327         if (src_grp != NULL)
1328                 lstcon_group_put(src_grp);
1329
1330         return rc;
1331 }
1332
1333 int
1334 lstcon_test_find(lstcon_batch_t *batch, int idx, lstcon_test_t **testpp)
1335 {
1336         lstcon_test_t *test;
1337
1338         list_for_each_entry(test, &batch->bat_test_list, tes_link) {
1339                 if (idx == test->tes_hdr.tsb_index) {
1340                         *testpp = test;
1341                         return 0;
1342                 }
1343         }
1344
1345         return -ENOENT;
1346 }
1347
1348 int
1349 lstcon_tsbrpc_readent(int transop, srpc_msg_t *msg,
1350                       lstcon_rpc_ent_t *ent_up)
1351 {
1352         srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1353
1354         LASSERT (transop == LST_TRANS_TSBCLIQRY ||
1355                  transop == LST_TRANS_TSBSRVQRY);
1356
1357         /* positive errno, framework error code */
1358         if (copy_to_user(&ent_up->rpe_priv[0],
1359                          &rep->bar_active, sizeof(rep->bar_active)))
1360                 return -EFAULT;
1361
1362         return 0;
1363 }
1364
1365 int
1366 lstcon_test_batch_query(char *name, int testidx, int client,
1367                         int timeout, struct list_head *result_up)
1368 {
1369         lstcon_rpc_trans_t *trans;
1370         struct list_head   *translist;
1371         struct list_head   *ndlist;
1372         lstcon_tsb_hdr_t   *hdr;
1373         lstcon_batch_t     *batch;
1374         lstcon_test_t      *test = NULL;
1375         int                 transop;
1376         int                 rc;
1377
1378         rc = lstcon_batch_find(name, &batch);
1379         if (rc != 0) {
1380                 CDEBUG(D_NET, "Can't find batch: %s\n", name);
1381                 return rc;
1382         }
1383
1384         if (testidx == 0) {
1385                 translist = &batch->bat_trans_list;
1386                 ndlist    = &batch->bat_cli_list;
1387                 hdr       = &batch->bat_hdr;
1388
1389         } else {
1390                 /* query specified test only */
1391                 rc = lstcon_test_find(batch, testidx, &test);
1392                 if (rc != 0) {
1393                         CDEBUG(D_NET, "Can't find test: %d\n", testidx);
1394                         return rc;
1395                 }
1396         
1397                 translist = &test->tes_trans_list;
1398                 ndlist    = &test->tes_src_grp->grp_ndl_list;
1399                 hdr       = &test->tes_hdr;
1400         } 
1401
1402         transop = client ? LST_TRANS_TSBCLIQRY : LST_TRANS_TSBSRVQRY;
1403
1404         rc = lstcon_rpc_trans_ndlist(ndlist, translist, transop, hdr,
1405                                      lstcon_batrpc_condition, &trans);
1406         if (rc != 0) {
1407                 CERROR("Can't create transaction: %d\n", rc);
1408                 return rc;
1409         }
1410
1411         lstcon_rpc_trans_postwait(trans, timeout);
1412
1413         if (testidx == 0 && /* query a batch, not a test */
1414             lstcon_rpc_stat_failure(lstcon_trans_stat(), 0) == 0 &&
1415             lstcon_tsbqry_stat_run(lstcon_trans_stat(), 0) == 0) {
1416                 /* all RPCs finished, and no active test */
1417                 batch->bat_state = LST_BATCH_IDLE;
1418         }
1419
1420         rc = lstcon_rpc_trans_interpreter(trans, result_up,
1421                                           lstcon_tsbrpc_readent);
1422         lstcon_rpc_trans_destroy(trans);
1423
1424         return rc;
1425 }
1426
1427 int
1428 lstcon_statrpc_readent(int transop, srpc_msg_t *msg,
1429                        lstcon_rpc_ent_t *ent_up)
1430 {
1431         srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1432         sfw_counters_t    *sfwk_stat;
1433         srpc_counters_t   *srpc_stat;
1434         lnet_counters_t   *lnet_stat;
1435         
1436         if (rep->str_status != 0)
1437                 return 0;
1438
1439         sfwk_stat = (sfw_counters_t *)&ent_up->rpe_payload[0];
1440         srpc_stat = (srpc_counters_t *)((char *)sfwk_stat + sizeof(*sfwk_stat));
1441         lnet_stat = (lnet_counters_t *)((char *)srpc_stat + sizeof(*srpc_stat));
1442
1443         if (copy_to_user(sfwk_stat, &rep->str_fw, sizeof(*sfwk_stat)) ||
1444             copy_to_user(srpc_stat, &rep->str_rpc, sizeof(*srpc_stat)) ||
1445             copy_to_user(lnet_stat, &rep->str_lnet, sizeof(*lnet_stat)))
1446                 return -EFAULT;
1447
1448         return 0;
1449 }
1450
1451 int
1452 lstcon_ndlist_stat(struct list_head *ndlist,
1453                    int timeout, struct list_head *result_up)
1454 {
1455         struct list_head    head;
1456         lstcon_rpc_trans_t *trans;
1457         int                 rc;
1458
1459         CFS_INIT_LIST_HEAD(&head);
1460
1461         rc = lstcon_rpc_trans_ndlist(ndlist, &head,
1462                                      LST_TRANS_STATQRY, NULL, NULL, &trans);
1463         if (rc != 0) {
1464                 CERROR("Can't create transaction: %d\n", rc);
1465                 return rc;
1466         }
1467
1468         timeout = (timeout > LST_TRANS_MIN_TIMEOUT) ? timeout :
1469                                                       LST_TRANS_MIN_TIMEOUT;
1470         lstcon_rpc_trans_postwait(trans, timeout);
1471
1472         rc = lstcon_rpc_trans_interpreter(trans, result_up,
1473                                           lstcon_statrpc_readent);
1474         lstcon_rpc_trans_destroy(trans);
1475
1476         return rc;
1477 }
1478
1479 int
1480 lstcon_group_stat(char *grp_name, int timeout, struct list_head *result_up)
1481 {
1482         lstcon_group_t     *grp;
1483         int                 rc;
1484
1485         rc = lstcon_group_find(grp_name, &grp);
1486         if (rc != 0) {
1487                 CDEBUG(D_NET, "Can't find group %s\n", grp_name);
1488                 return rc;
1489         }
1490
1491         rc = lstcon_ndlist_stat(&grp->grp_ndl_list, timeout, result_up);
1492
1493         lstcon_group_put(grp);
1494
1495         return rc;
1496 }
1497
1498 int
1499 lstcon_nodes_stat(int count, lnet_process_id_t *ids_up,
1500                   int timeout, struct list_head *result_up)
1501 {
1502         lstcon_ndlink_t         *ndl;
1503         lstcon_group_t          *tmp;
1504         lnet_process_id_t        id;
1505         int                      i;
1506         int                      rc;
1507
1508         rc = lstcon_group_alloc(NULL, &tmp);
1509         if (rc != 0) {
1510                 CERROR("Out of memory\n");
1511                 return -ENOMEM;
1512         }
1513
1514         for (i = 0 ; i < count; i++) {
1515                 if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
1516                         rc = -EFAULT;
1517                         break;
1518                 }
1519
1520                 /* add to tmp group */
1521                 rc = lstcon_group_ndlink_find(tmp, id, &ndl, 2);
1522                 if (rc != 0) {
1523                         CDEBUG((rc == -ENOMEM) ? D_ERROR : D_NET,
1524                                "Failed to find or create %s: %d\n",
1525                                libcfs_id2str(id), rc);
1526                         break;
1527                 }
1528         }
1529
1530         if (rc != 0) {
1531                 lstcon_group_put(tmp);
1532                 return rc;
1533         }
1534
1535         rc = lstcon_ndlist_stat(&tmp->grp_ndl_list, timeout, result_up);
1536
1537         lstcon_group_put(tmp);
1538
1539         return rc;
1540 }
1541
1542 int
1543 lstcon_debug_ndlist(struct list_head *ndlist,
1544                     struct list_head *translist,
1545                     int timeout, struct list_head *result_up)
1546 {
1547         lstcon_rpc_trans_t *trans;
1548         int                 rc;
1549
1550         rc = lstcon_rpc_trans_ndlist(ndlist, translist, LST_TRANS_SESQRY,
1551                                      NULL, lstcon_sesrpc_condition, &trans);
1552         if (rc != 0) {
1553                 CERROR("Can't create transaction: %d\n", rc);
1554                 return rc;
1555         }
1556
1557         timeout = (timeout > LST_TRANS_MIN_TIMEOUT) ? timeout :
1558                                                       LST_TRANS_MIN_TIMEOUT;
1559
1560         lstcon_rpc_trans_postwait(trans, timeout);
1561
1562         rc = lstcon_rpc_trans_interpreter(trans, result_up,
1563                                           lstcon_sesrpc_readent);
1564         lstcon_rpc_trans_destroy(trans);
1565
1566         return rc;
1567 }
1568
1569 int
1570 lstcon_session_debug(int timeout, struct list_head *result_up)
1571 {
1572         return lstcon_debug_ndlist(&console_session.ses_ndl_list,
1573                                    NULL, timeout, result_up);
1574 }
1575
1576 int
1577 lstcon_batch_debug(int timeout, char *name,
1578                    int client, struct list_head *result_up)
1579 {
1580         lstcon_batch_t *bat;
1581         int             rc;
1582
1583         rc = lstcon_batch_find(name, &bat);
1584         if (rc != 0)
1585                 return -ENOENT;
1586
1587         rc = lstcon_debug_ndlist(client ? &bat->bat_cli_list :
1588                                           &bat->bat_srv_list,
1589                                  NULL, timeout, result_up);
1590
1591         return rc;
1592 }
1593
1594 int
1595 lstcon_group_debug(int timeout, char *name,
1596                    struct list_head *result_up)
1597 {
1598         lstcon_group_t *grp;
1599         int             rc;
1600
1601         rc = lstcon_group_find(name, &grp);
1602         if (rc != 0)
1603                 return -ENOENT;
1604
1605         rc = lstcon_debug_ndlist(&grp->grp_ndl_list, NULL,
1606                                  timeout, result_up);
1607         lstcon_group_put(grp);
1608
1609         return rc;
1610 }
1611
1612 int
1613 lstcon_nodes_debug(int timeout,
1614                    int count, lnet_process_id_t *ids_up, 
1615                    struct list_head *result_up)
1616 {
1617         lnet_process_id_t  id;
1618         lstcon_ndlink_t   *ndl;
1619         lstcon_group_t    *grp;
1620         int                i;
1621         int                rc;
1622
1623         rc = lstcon_group_alloc(NULL, &grp);
1624         if (rc != 0) {
1625                 CDEBUG(D_NET, "Out of memory\n");
1626                 return rc;
1627         }
1628
1629         for (i = 0; i < count; i++) {
1630                 if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
1631                         rc = -EFAULT;
1632                         break;
1633                 }
1634
1635                 /* node is added to tmp group */
1636                 rc = lstcon_group_ndlink_find(grp, id, &ndl, 1);
1637                 if (rc != 0) {
1638                         CERROR("Can't create node link\n");
1639                         break;
1640                 }
1641         }
1642
1643         if (rc != 0) {
1644                 lstcon_group_put(grp);
1645                 return rc;
1646         }
1647
1648         rc = lstcon_debug_ndlist(&grp->grp_ndl_list, NULL,
1649                                  timeout, result_up);
1650
1651         lstcon_group_put(grp);
1652
1653         return rc;
1654 }
1655
1656 int
1657 lstcon_session_match(lst_sid_t sid)
1658 {
1659         return (console_session.ses_id.ses_nid   == sid.ses_nid &&
1660                 console_session.ses_id.ses_stamp == sid.ses_stamp) ?  1: 0;
1661 }
1662
1663 static void
1664 lstcon_new_session_id(lst_sid_t *sid)
1665 {
1666         lnet_process_id_t      id;
1667
1668         LASSERT (console_session.ses_state == LST_SESSION_NONE);
1669
1670         LNetGetId(1, &id);
1671         sid->ses_nid   = id.nid;
1672         sid->ses_stamp = cfs_time_current();
1673 }
1674
1675 extern srpc_service_t lstcon_acceptor_service;
1676
1677 int
1678 lstcon_session_new(char *name, int key,
1679                    int timeout,int force, lst_sid_t *sid_up)
1680 {
1681         int     rc = 0;
1682         int     i;
1683
1684         if (console_session.ses_state != LST_SESSION_NONE) {
1685                 /* session exists */
1686                 if (!force) {
1687                         CERROR("Session %s already exists\n",
1688                                console_session.ses_name);
1689                         return -EEXIST;
1690                 }
1691
1692                 rc = lstcon_session_end();
1693
1694                 /* lstcon_session_end() only return local error */
1695                 if  (rc != 0)
1696                         return rc;
1697         }
1698
1699         for (i = 0; i < LST_GLOBAL_HASHSIZE; i++) {
1700                 LASSERT (list_empty(&console_session.ses_ndl_hash[i]));
1701         }
1702
1703         rc = lstcon_batch_add(LST_DEFAULT_BATCH);
1704         if (rc != 0)
1705                 return rc;
1706
1707         rc = lstcon_rpc_pinger_start();
1708         if (rc != 0) {
1709                 lstcon_batch_t *bat;
1710
1711                 lstcon_batch_find(LST_DEFAULT_BATCH, &bat);
1712                 lstcon_batch_destroy(bat);
1713
1714                 return rc;
1715         }
1716
1717         lstcon_new_session_id(&console_session.ses_id);
1718
1719         console_session.ses_key     = key;
1720         console_session.ses_state   = LST_SESSION_ACTIVE;
1721         console_session.ses_force   = !!force;
1722         console_session.ses_timeout = (timeout <= 0)? LST_CONSOLE_TIMEOUT:
1723                                                       timeout;
1724         strcpy(console_session.ses_name, name);
1725
1726         if (copy_to_user(sid_up, &console_session.ses_id,
1727                          sizeof(lst_sid_t)) == 0)
1728                 return rc;
1729
1730         lstcon_session_end();
1731
1732         return -EFAULT;
1733 }
1734
1735 int
1736 lstcon_session_info(lst_sid_t *sid_up, int *key_up,
1737                     lstcon_ndlist_ent_t *ndinfo_up, char *name_up, int len)
1738 {
1739         lstcon_ndlist_ent_t *entp;
1740         lstcon_ndlink_t     *ndl;
1741         int                  rc = 0;
1742         
1743         if (console_session.ses_state != LST_SESSION_ACTIVE)
1744                 return -ESRCH;
1745
1746         LIBCFS_ALLOC(entp, sizeof(*entp));
1747         if (entp == NULL)
1748                 return -ENOMEM;
1749
1750         memset(entp, 0, sizeof(*entp));
1751
1752         list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link)
1753                 LST_NODE_STATE_COUNTER(ndl->ndl_node, entp);
1754
1755         if (copy_to_user(sid_up, &console_session.ses_id, sizeof(lst_sid_t)) ||
1756             copy_to_user(key_up, &console_session.ses_key, sizeof(int)) ||
1757             copy_to_user(ndinfo_up, entp, sizeof(*entp)) ||
1758             copy_to_user(name_up, console_session.ses_name, len))
1759                 rc = -EFAULT;
1760
1761         LIBCFS_FREE(entp, sizeof(*entp));
1762
1763         return rc;
1764 }
1765
1766 int
1767 lstcon_session_end()
1768 {
1769         lstcon_rpc_trans_t *trans;
1770         lstcon_group_t     *grp;
1771         lstcon_batch_t     *bat;
1772         int                 rc = 0;
1773
1774         LASSERT (console_session.ses_state == LST_SESSION_ACTIVE);
1775
1776         rc = lstcon_rpc_trans_ndlist(&console_session.ses_ndl_list, 
1777                                      NULL, LST_TRANS_SESEND, NULL,
1778                                      lstcon_sesrpc_condition, &trans);
1779         if (rc != 0) {
1780                 CERROR("Can't create transaction: %d\n", rc);
1781                 return rc;
1782         }
1783
1784         console_session.ses_shutdown = 1;
1785
1786         lstcon_rpc_pinger_stop();
1787
1788         lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);
1789
1790         lstcon_rpc_trans_destroy(trans);
1791         /* User can do nothing even rpc failed, so go on */
1792
1793         /* waiting for orphan rpcs to die */
1794         lstcon_rpc_cleanup_wait();
1795
1796         console_session.ses_id    = LST_INVALID_SID;
1797         console_session.ses_state = LST_SESSION_NONE;
1798         console_session.ses_key   = 0;
1799         console_session.ses_force = 0;
1800
1801         /* destroy all batches */
1802         while (!list_empty(&console_session.ses_bat_list)) {
1803                 bat = list_entry(console_session.ses_bat_list.next,
1804                                  lstcon_batch_t, bat_link);
1805
1806                 lstcon_batch_destroy(bat);
1807         }
1808
1809         /* destroy all groups */
1810         while (!list_empty(&console_session.ses_grp_list)) {
1811                 grp = list_entry(console_session.ses_grp_list.next,
1812                                  lstcon_group_t, grp_link);
1813                 LASSERT (grp->grp_ref == 1);
1814
1815                 lstcon_group_put(grp);
1816         }
1817
1818         /* all nodes should be released */
1819         LASSERT (list_empty(&console_session.ses_ndl_list));
1820
1821         console_session.ses_shutdown = 0;
1822         console_session.ses_expired  = 0;
1823
1824         return rc;
1825 }
1826
1827 static int
1828 lstcon_acceptor_handle (srpc_server_rpc_t *rpc)
1829 {
1830         srpc_msg_t        *rep  = &rpc->srpc_replymsg;
1831         srpc_msg_t        *req  = &rpc->srpc_reqstbuf->buf_msg;
1832         srpc_join_reqst_t *jreq = &req->msg_body.join_reqst;
1833         srpc_join_reply_t *jrep = &rep->msg_body.join_reply;
1834         lstcon_group_t    *grp  = NULL;
1835         lstcon_ndlink_t   *ndl;
1836         int                rc   = 0;
1837
1838         sfw_unpack_message(req);
1839
1840         mutex_down(&console_session.ses_mutex);
1841
1842         jrep->join_sid = console_session.ses_id;
1843
1844         if (console_session.ses_id.ses_nid == LNET_NID_ANY) {
1845                 jrep->join_status = ESRCH;
1846                 goto out;
1847         }
1848
1849         if (jreq->join_sid.ses_nid != LNET_NID_ANY &&
1850              !lstcon_session_match(jreq->join_sid)) {
1851                 jrep->join_status = EBUSY;
1852                 goto out;
1853         }
1854
1855         if (lstcon_group_find(jreq->join_group, &grp) != 0) {
1856                 rc = lstcon_group_alloc(jreq->join_group, &grp);
1857                 if (rc != 0) {
1858                         CERROR("Out of memory\n");
1859                         goto out;
1860                 }
1861
1862                 list_add_tail(&grp->grp_link,
1863                               &console_session.ses_grp_list);
1864                 lstcon_group_addref(grp);
1865         }
1866
1867         if (grp->grp_ref > 2) {
1868                 /* Group in using */
1869                 jrep->join_status = EBUSY;
1870                 goto out;
1871         }
1872
1873         rc = lstcon_group_ndlink_find(grp, rpc->srpc_peer, &ndl, 0);
1874         if (rc == 0) {
1875                 jrep->join_status = EEXIST;
1876                 goto out;
1877         }
1878
1879         rc = lstcon_group_ndlink_find(grp, rpc->srpc_peer, &ndl, 1);
1880         if (rc != 0) {
1881                 CERROR("Out of memory\n");
1882                 goto out;
1883         }
1884
1885         ndl->ndl_node->nd_state   = LST_NODE_ACTIVE;
1886         ndl->ndl_node->nd_timeout = console_session.ses_timeout;
1887
1888         if (grp->grp_userland == 0)
1889                 grp->grp_userland = 1;
1890
1891         strcpy(jrep->join_session, console_session.ses_name);
1892         jrep->join_timeout = console_session.ses_timeout;
1893         jrep->join_status  = 0;
1894
1895 out:
1896         if (grp != NULL)
1897                 lstcon_group_put(grp);
1898
1899         mutex_up(&console_session.ses_mutex);
1900
1901         return rc;
1902 }
1903
1904 srpc_service_t lstcon_acceptor_service =
1905 {
1906         .sv_name        = "join session",
1907         .sv_handler     = lstcon_acceptor_handle,
1908         .sv_bulk_ready  = NULL,
1909         .sv_id          = SRPC_SERVICE_JOIN,
1910         .sv_concur      = SFW_SERVICE_CONCURRENCY,
1911 };
1912
1913 extern int lstcon_ioctl_entry(unsigned int cmd, struct libcfs_ioctl_data *data);
1914
1915 DECLARE_IOCTL_HANDLER(lstcon_ioctl_handler, lstcon_ioctl_entry);
1916
1917 /* initialize console */
1918 int
1919 lstcon_console_init(void)
1920 {
1921         int     i;
1922         int     n;
1923         int     rc;
1924
1925         memset(&console_session, 0, sizeof(lstcon_session_t));
1926
1927         console_session.ses_id      = LST_INVALID_SID;
1928         console_session.ses_state   = LST_SESSION_NONE;
1929         console_session.ses_timeout = 0;
1930         console_session.ses_force   = 0;
1931         console_session.ses_expired = 0;
1932         console_session.ses_laststamp = cfs_time_current_sec();   
1933
1934         init_mutex(&console_session.ses_mutex);
1935
1936         CFS_INIT_LIST_HEAD(&console_session.ses_ndl_list);
1937         CFS_INIT_LIST_HEAD(&console_session.ses_grp_list);
1938         CFS_INIT_LIST_HEAD(&console_session.ses_bat_list);
1939         CFS_INIT_LIST_HEAD(&console_session.ses_trans_list);
1940
1941         LIBCFS_ALLOC(console_session.ses_ndl_hash,
1942                      sizeof(struct list_head) * LST_GLOBAL_HASHSIZE);
1943         if (console_session.ses_ndl_hash == NULL)
1944                 return -ENOMEM;
1945
1946         for (i = 0; i < LST_GLOBAL_HASHSIZE; i++)
1947                 CFS_INIT_LIST_HEAD(&console_session.ses_ndl_hash[i]);
1948
1949         rc = srpc_add_service(&lstcon_acceptor_service);
1950         LASSERT (rc != -EBUSY);
1951         if (rc != 0) {
1952                 LIBCFS_FREE(console_session.ses_ndl_hash,
1953                             sizeof(struct list_head) * LST_GLOBAL_HASHSIZE);
1954                 return rc;
1955         }
1956
1957         n = srpc_service_add_buffers(&lstcon_acceptor_service, SFW_POST_BUFFERS);
1958         if (n != SFW_POST_BUFFERS) {
1959                 rc = -ENOMEM;
1960                 goto out;
1961         }
1962
1963         rc = libcfs_register_ioctl(&lstcon_ioctl_handler);
1964
1965         if (rc == 0) {
1966                 lstcon_rpc_module_init();
1967                 return 0;
1968         }
1969
1970 out:
1971         srpc_shutdown_service(&lstcon_acceptor_service);
1972         srpc_remove_service(&lstcon_acceptor_service);
1973
1974         LIBCFS_FREE(console_session.ses_ndl_hash,
1975                     sizeof(struct list_head) * LST_GLOBAL_HASHSIZE);
1976
1977         srpc_wait_service_shutdown(&lstcon_acceptor_service);
1978
1979         return rc;
1980 }
1981
1982 int
1983 lstcon_console_fini(void)
1984 {
1985         int     i;
1986
1987         mutex_down(&console_session.ses_mutex);
1988
1989         libcfs_deregister_ioctl(&lstcon_ioctl_handler);
1990
1991         srpc_shutdown_service(&lstcon_acceptor_service);
1992         srpc_remove_service(&lstcon_acceptor_service);
1993
1994         if (console_session.ses_state != LST_SESSION_NONE) 
1995                 lstcon_session_end();
1996
1997         lstcon_rpc_module_fini();
1998
1999         mutex_up(&console_session.ses_mutex);
2000
2001         LASSERT (list_empty(&console_session.ses_ndl_list));
2002         LASSERT (list_empty(&console_session.ses_grp_list));
2003         LASSERT (list_empty(&console_session.ses_bat_list));
2004         LASSERT (list_empty(&console_session.ses_trans_list));
2005
2006         for (i = 0; i < LST_NODE_HASHSIZE; i++) {
2007                 LASSERT (list_empty(&console_session.ses_ndl_hash[i]));
2008         }
2009
2010         LIBCFS_FREE(console_session.ses_ndl_hash,
2011                     sizeof(struct list_head) * LST_GLOBAL_HASHSIZE);
2012
2013         srpc_wait_service_shutdown(&lstcon_acceptor_service);
2014
2015         return 0;
2016 }
2017
2018 #endif