Whamcloud - gitweb
067c50c68a486a19f05fb510d956e6c732bd3dc0
[fs/lustre-release.git] / lnet / selftest / conrpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/selftest/conctl.c
39  *
40  * Console framework rpcs
41  *
42  * Author: Liang Zhen <liangzhen@clusterfs.com>
43  */
44
45 #ifdef __KERNEL__
46
47 #include <libcfs/libcfs.h>
48 #include <lnet/lib-lnet.h>
49 #include "timer.h"
50 #include "conrpc.h"
51 #include "console.h"
52
53 void lstcon_rpc_stat_reply(int, srpc_msg_t *,
54                            lstcon_node_t *, lstcon_trans_stat_t *);
55
56 static void
57 lstcon_rpc_done(srpc_client_rpc_t *rpc)
58 {
59         lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv;
60
61         LASSERT (crpc != NULL && rpc == crpc->crp_rpc);
62         LASSERT (crpc->crp_posted && !crpc->crp_finished);
63
64         cfs_spin_lock(&rpc->crpc_lock);
65
66         if (crpc->crp_trans == NULL) {
67                 /* Orphan RPC is not in any transaction, 
68                  * I'm just a poor body and nobody loves me */
69                 cfs_spin_unlock(&rpc->crpc_lock);
70
71                 /* release it */
72                 lstcon_rpc_put(crpc);
73                 return;
74         }
75
76         /* not an orphan RPC */
77         crpc->crp_finished = 1;
78
79         if (crpc->crp_stamp == 0) {
80                 /* not aborted */
81                 LASSERT (crpc->crp_status == 0);
82
83                 crpc->crp_stamp  = cfs_time_current();
84                 crpc->crp_status = rpc->crpc_status;
85         }
86
87         /* wakeup (transaction)thread if I'm the last RPC in the transaction */
88         if (cfs_atomic_dec_and_test(&crpc->crp_trans->tas_remaining))
89                 cfs_waitq_signal(&crpc->crp_trans->tas_waitq);
90
91         cfs_spin_unlock(&rpc->crpc_lock);
92 }
93
94 int
95 lstcon_rpc_init(lstcon_node_t *nd, int service,
96                 int npg, int cached, lstcon_rpc_t *crpc)
97 {
98
99         crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service, 
100                                        npg, npg * CFS_PAGE_SIZE,
101                                        lstcon_rpc_done, (void *)crpc);
102         if (crpc->crp_rpc == NULL)
103                 return -ENOMEM;
104
105         crpc->crp_trans    = NULL;
106         crpc->crp_node     = nd;
107         crpc->crp_posted   = 0;
108         crpc->crp_finished = 0;
109         crpc->crp_unpacked = 0;
110         crpc->crp_status   = 0;
111         crpc->crp_stamp    = 0;
112         crpc->crp_static   = !cached;
113         CFS_INIT_LIST_HEAD(&crpc->crp_link);
114
115         cfs_atomic_inc(&console_session.ses_rpc_counter);
116
117         return 0;
118 }
119
120 int
121 lstcon_rpc_prep(lstcon_node_t *nd, int service,
122                 int npg, lstcon_rpc_t **crpcpp)
123 {
124         lstcon_rpc_t  *crpc = NULL;
125         int            rc;
126
127         cfs_spin_lock(&console_session.ses_rpc_lock);
128
129         if (!cfs_list_empty(&console_session.ses_rpc_freelist)) {
130                 crpc = cfs_list_entry(console_session.ses_rpc_freelist.next,
131                                       lstcon_rpc_t, crp_link);
132                 cfs_list_del_init(&crpc->crp_link);
133         }
134
135         cfs_spin_unlock(&console_session.ses_rpc_lock);
136
137         if (crpc == NULL) {
138                 LIBCFS_ALLOC(crpc, sizeof(*crpc));
139                 if (crpc == NULL)
140                         return -ENOMEM;
141         }
142
143         rc = lstcon_rpc_init(nd, service, npg, 1, crpc);
144         if (rc == 0) {
145                 *crpcpp = crpc;
146                 return 0;
147         }
148
149         LIBCFS_FREE(crpc, sizeof(*crpc));
150
151         return rc;
152 }
153
154 void
155 lstcon_rpc_put(lstcon_rpc_t *crpc)
156 {
157         srpc_bulk_t *bulk = &crpc->crp_rpc->crpc_bulk;
158         int          i;
159
160         LASSERT (cfs_list_empty(&crpc->crp_link));
161
162         for (i = 0; i < bulk->bk_niov; i++) {
163                 if (bulk->bk_iovs[i].kiov_page == NULL)
164                         continue;
165
166                 cfs_free_page(bulk->bk_iovs[i].kiov_page);
167         }
168
169         srpc_client_rpc_decref(crpc->crp_rpc);
170
171         if (crpc->crp_static) {
172                 /* Static RPC, not allocated */
173                 memset(crpc, 0, sizeof(*crpc));
174                 crpc->crp_static = 1;
175
176         } else {
177                 cfs_spin_lock(&console_session.ses_rpc_lock);
178
179                 cfs_list_add(&crpc->crp_link,
180                              &console_session.ses_rpc_freelist);
181
182                 cfs_spin_unlock(&console_session.ses_rpc_lock);
183         }
184
185         /* RPC is not alive now */
186         cfs_atomic_dec(&console_session.ses_rpc_counter);
187 }
188
189 void
190 lstcon_rpc_post(lstcon_rpc_t *crpc)
191 {
192         lstcon_rpc_trans_t *trans = crpc->crp_trans;
193
194         LASSERT (trans != NULL);
195
196         cfs_atomic_inc(&trans->tas_remaining);
197         crpc->crp_posted = 1;
198
199         sfw_post_rpc(crpc->crp_rpc);
200 }
201
202 static char *
203 lstcon_rpc_trans_name(int transop)
204 {
205         if (transop == LST_TRANS_SESNEW)
206                 return "SESNEW";
207
208         if (transop == LST_TRANS_SESEND)
209                 return "SESEND";
210
211         if (transop == LST_TRANS_SESQRY)
212                 return "SESQRY";
213
214         if (transop == LST_TRANS_SESPING)
215                 return "SESPING";
216
217         if (transop == LST_TRANS_TSBCLIADD)
218                 return "TSBCLIADD";
219
220         if (transop == LST_TRANS_TSBSRVADD)
221                 return "TSBSRVADD";
222
223         if (transop == LST_TRANS_TSBRUN)
224                 return "TSBRUN";
225
226         if (transop == LST_TRANS_TSBSTOP)
227                 return "TSBSTOP";
228
229         if (transop == LST_TRANS_TSBCLIQRY)
230                 return "TSBCLIQRY";
231
232         if (transop == LST_TRANS_TSBSRVQRY)
233                 return "TSBSRVQRY";
234
235         if (transop == LST_TRANS_STATQRY)
236                 return "STATQRY";
237
238         return "Unknown";
239 }
240
241 int
242 lstcon_rpc_trans_prep(cfs_list_t *translist,
243                       int transop, lstcon_rpc_trans_t **transpp)
244 {
245         lstcon_rpc_trans_t *trans;
246
247         if (translist != NULL) {
248                 cfs_list_for_each_entry_typed(trans, translist,
249                                               lstcon_rpc_trans_t, tas_link) {
250                         /* Can't enqueue two private transaction on
251                          * the same object */
252                         if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE)
253                                 return -EPERM;
254                 }
255         }
256
257         /* create a trans group */
258         LIBCFS_ALLOC(trans, sizeof(*trans));
259         if (trans == NULL)
260                 return -ENOMEM;
261         
262         trans->tas_opc = transop;
263
264         if (translist == NULL)       
265                 CFS_INIT_LIST_HEAD(&trans->tas_olink);
266         else
267                 cfs_list_add_tail(&trans->tas_olink, translist);
268
269         cfs_list_add_tail(&trans->tas_link, &console_session.ses_trans_list);
270
271         CFS_INIT_LIST_HEAD(&trans->tas_rpcs_list);
272         cfs_atomic_set(&trans->tas_remaining, 0);
273         cfs_waitq_init(&trans->tas_waitq);
274
275         *transpp = trans;
276
277         return 0;
278 }
279
280 void
281 lstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *crpc)
282 {
283         cfs_list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list);
284         crpc->crp_trans = trans;
285 }
286
287 void
288 lstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error)
289 {
290         srpc_client_rpc_t *rpc;
291         lstcon_rpc_t      *crpc;
292         lstcon_node_t     *nd;
293
294         cfs_list_for_each_entry_typed (crpc, &trans->tas_rpcs_list,
295                                        lstcon_rpc_t, crp_link) {
296                 rpc = crpc->crp_rpc;
297
298                 cfs_spin_lock(&rpc->crpc_lock);
299
300                 if (!crpc->crp_posted || /* not posted */
301                     crpc->crp_stamp != 0) { /* rpc done or aborted already */
302                         if (crpc->crp_stamp == 0) {
303                                 crpc->crp_stamp = cfs_time_current();
304                                 crpc->crp_status = -EINTR;
305                         }
306                         cfs_spin_unlock(&rpc->crpc_lock);
307                         continue;
308                 }
309
310                 crpc->crp_stamp  = cfs_time_current();
311                 crpc->crp_status = error;
312
313                 cfs_spin_unlock(&rpc->crpc_lock);
314
315                 sfw_abort_rpc(rpc);
316
317                 if  (error != ETIMEDOUT)
318                         continue;
319
320                 nd = crpc->crp_node;
321                 if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
322                         continue;
323
324                 nd->nd_stamp = crpc->crp_stamp;
325                 nd->nd_state = LST_NODE_DOWN;
326         }
327 }
328
329 static int
330 lstcon_rpc_trans_check(lstcon_rpc_trans_t *trans)
331 {
332         if (console_session.ses_shutdown &&
333             !cfs_list_empty(&trans->tas_olink)) /* Not an end session RPC */
334                 return 1;
335
336         return (cfs_atomic_read(&trans->tas_remaining) == 0) ? 1: 0;
337 }
338
339 int
340 lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout)
341 {
342         lstcon_rpc_t  *crpc;
343         int            rc;
344
345         if (cfs_list_empty(&trans->tas_rpcs_list))
346                 return 0;
347
348         if (timeout < LST_TRANS_MIN_TIMEOUT)
349                 timeout = LST_TRANS_MIN_TIMEOUT;
350
351         CDEBUG(D_NET, "Transaction %s started\n",
352                lstcon_rpc_trans_name(trans->tas_opc));
353
354         /* post all requests */
355         cfs_list_for_each_entry_typed (crpc, &trans->tas_rpcs_list,
356                                        lstcon_rpc_t, crp_link) {
357                 LASSERT (!crpc->crp_posted);
358
359                 lstcon_rpc_post(crpc);
360         }
361
362         cfs_mutex_unlock(&console_session.ses_mutex);
363
364         cfs_waitq_wait_event_interruptible_timeout(trans->tas_waitq,
365                                               lstcon_rpc_trans_check(trans),
366                                               cfs_time_seconds(timeout), rc);
367
368         rc = (rc > 0)? 0: ((rc < 0)? -EINTR: -ETIMEDOUT);
369
370         cfs_mutex_lock(&console_session.ses_mutex);
371
372         if (console_session.ses_shutdown)
373                 rc = -ESHUTDOWN;
374
375         if (rc != 0 || atomic_read(&trans->tas_remaining) != 0) {
376                 /* treat short timeout as canceled */
377                 if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2)
378                         rc = -EINTR;
379
380                 lstcon_rpc_trans_abort(trans, rc);
381         }
382
383         CDEBUG(D_NET, "Transaction %s stopped: %d\n",
384                lstcon_rpc_trans_name(trans->tas_opc), rc);
385
386         lstcon_rpc_trans_stat(trans, lstcon_trans_stat());
387
388         return rc;
389 }
390
391 int
392 lstcon_rpc_get_reply(lstcon_rpc_t *crpc, srpc_msg_t **msgpp)
393 {
394         lstcon_node_t        *nd  = crpc->crp_node;
395         srpc_client_rpc_t    *rpc = crpc->crp_rpc;
396         srpc_generic_reply_t *rep;
397
398         LASSERT (nd != NULL && rpc != NULL);
399         LASSERT (crpc->crp_stamp != 0);
400
401         if (crpc->crp_status != 0) {
402                 *msgpp = NULL;
403                 return crpc->crp_status;
404         }
405
406         *msgpp = &rpc->crpc_replymsg;
407         if (!crpc->crp_unpacked) {
408                 sfw_unpack_message(*msgpp);
409                 crpc->crp_unpacked = 1;
410         }
411        
412         if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
413                 return 0;
414
415         nd->nd_stamp = crpc->crp_stamp;
416         rep = &(*msgpp)->msg_body.reply;
417
418         if (rep->sid.ses_nid == LNET_NID_ANY)
419                 nd->nd_state = LST_NODE_UNKNOWN;
420         else if (lstcon_session_match(rep->sid))
421                 nd->nd_state = LST_NODE_ACTIVE;
422         else
423                 nd->nd_state = LST_NODE_BUSY;
424
425         return 0;
426 }
427
428 void
429 lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat)
430 {
431         lstcon_rpc_t      *crpc;
432         srpc_msg_t        *rep;
433         int                error;
434
435         LASSERT (stat != NULL);
436
437         memset(stat, 0, sizeof(*stat));
438
439         cfs_list_for_each_entry_typed(crpc, &trans->tas_rpcs_list,
440                                       lstcon_rpc_t, crp_link) {
441                 lstcon_rpc_stat_total(stat, 1);
442
443                 LASSERT (crpc->crp_stamp != 0);
444
445                 error = lstcon_rpc_get_reply(crpc, &rep);
446                 if (error != 0) {
447                         lstcon_rpc_stat_failure(stat, 1);
448                         if (stat->trs_rpc_errno == 0)
449                                 stat->trs_rpc_errno = -error;
450
451                         continue;
452                 }
453
454                 lstcon_rpc_stat_success(stat, 1);
455
456                 lstcon_rpc_stat_reply(trans->tas_opc, rep,
457                                       crpc->crp_node, stat);
458         }
459
460         CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, "
461                       "RPC error(%d), Framework error(%d)\n",
462                lstcon_rpc_trans_name(trans->tas_opc),
463                lstcon_rpc_stat_success(stat, 0),
464                lstcon_rpc_stat_failure(stat, 0),
465                lstcon_rpc_stat_total(stat, 0),
466                stat->trs_rpc_errno, stat->trs_fwk_errno);
467
468         return;
469 }
470
471 int
472 lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans,
473                              cfs_list_t *head_up,
474                              lstcon_rpc_readent_func_t readent)
475 {
476         cfs_list_t            tmp;
477         cfs_list_t           *next;
478         lstcon_rpc_ent_t     *ent;
479         srpc_generic_reply_t *rep;
480         lstcon_rpc_t         *crpc;
481         srpc_msg_t           *msg;
482         lstcon_node_t        *nd;
483         cfs_duration_t        dur;
484         struct timeval        tv;
485         int                   error;
486
487         LASSERT (head_up != NULL);
488
489         next = head_up;
490
491         cfs_list_for_each_entry_typed(crpc, &trans->tas_rpcs_list,
492                                       lstcon_rpc_t, crp_link) {
493                 if (cfs_copy_from_user(&tmp, next,
494                                        sizeof(cfs_list_t)))
495                         return -EFAULT;
496
497                 if (tmp.next == head_up)
498                         return 0;
499
500                 next = tmp.next;
501
502                 ent = cfs_list_entry(next, lstcon_rpc_ent_t, rpe_link);
503
504                 LASSERT (crpc->crp_stamp != 0);
505
506                 error = lstcon_rpc_get_reply(crpc, &msg);
507
508                 nd = crpc->crp_node;
509
510                 dur = (cfs_duration_t)cfs_time_sub(crpc->crp_stamp,
511                       (cfs_time_t)console_session.ses_id.ses_stamp);
512                 cfs_duration_usec(dur, &tv);
513
514                 if (cfs_copy_to_user(&ent->rpe_peer,
515                                      &nd->nd_id, sizeof(lnet_process_id_t)) ||
516                     cfs_copy_to_user(&ent->rpe_stamp, &tv, sizeof(tv)) ||
517                     cfs_copy_to_user(&ent->rpe_state,
518                                      &nd->nd_state, sizeof(nd->nd_state)) ||
519                     cfs_copy_to_user(&ent->rpe_rpc_errno, &error,
520                                      sizeof(error)))
521                         return -EFAULT;
522
523                 if (error != 0)
524                         continue;
525
526                 /* RPC is done */
527                 rep = (srpc_generic_reply_t *)&msg->msg_body.reply;
528
529                 if (cfs_copy_to_user(&ent->rpe_sid,
530                                      &rep->sid, sizeof(lst_sid_t)) ||
531                     cfs_copy_to_user(&ent->rpe_fwk_errno,
532                                      &rep->status, sizeof(rep->status)))
533                         return -EFAULT;
534
535                 if (readent == NULL)
536                         continue;
537
538                 if ((error = readent(trans->tas_opc, msg, ent)) != 0)
539                         return error;
540         }
541
542         return 0;
543 }
544
545 void
546 lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans)
547 {
548         srpc_client_rpc_t *rpc;
549         lstcon_rpc_t      *crpc;
550         lstcon_rpc_t      *tmp;
551         int                count = 0;
552
553         cfs_list_for_each_entry_safe_typed(crpc, tmp,
554                                            &trans->tas_rpcs_list,
555                                            lstcon_rpc_t, crp_link) {
556                 rpc = crpc->crp_rpc;
557
558                 cfs_spin_lock(&rpc->crpc_lock);
559
560                 /* free it if not posted or finished already */
561                 if (!crpc->crp_posted || crpc->crp_finished) {
562                         cfs_spin_unlock(&rpc->crpc_lock);
563
564                         cfs_list_del_init(&crpc->crp_link);
565                         lstcon_rpc_put(crpc);
566
567                         continue;
568                 }
569
570                 /* rpcs can be still not callbacked (even LNetMDUnlink is called)
571                  * because huge timeout for inaccessible network, don't make
572                  * user wait for them, just abandon them, they will be recycled
573                  * in callback */
574
575                 LASSERT (crpc->crp_status != 0);
576
577                 crpc->crp_node  = NULL;
578                 crpc->crp_trans = NULL;
579                 cfs_list_del_init(&crpc->crp_link);
580                 count ++;
581
582                 cfs_spin_unlock(&rpc->crpc_lock);
583
584                 cfs_atomic_dec(&trans->tas_remaining);
585         }
586
587         LASSERT (cfs_atomic_read(&trans->tas_remaining) == 0);
588
589         cfs_list_del(&trans->tas_link);
590         if (!cfs_list_empty(&trans->tas_olink))
591                 cfs_list_del(&trans->tas_olink);
592
593         CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n",
594                lstcon_rpc_trans_name(trans->tas_opc), count);
595
596         LIBCFS_FREE(trans, sizeof(*trans));
597
598         return;
599 }
600
601 int
602 lstcon_sesrpc_prep(lstcon_node_t *nd, int transop, lstcon_rpc_t **crpc)
603 {
604         srpc_mksn_reqst_t *msrq;
605         srpc_rmsn_reqst_t *rsrq;
606         int                rc;
607
608         switch (transop) {
609         case LST_TRANS_SESNEW:
610                 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION, 0, crpc);
611                 if (rc != 0)
612                         return rc;
613
614                 msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst;
615                 msrq->mksn_sid     = console_session.ses_id;
616                 msrq->mksn_force   = console_session.ses_force;
617                 strncpy(msrq->mksn_name, console_session.ses_name,
618                         strlen(console_session.ses_name));
619                 break;
620
621         case LST_TRANS_SESEND:
622                 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION, 0, crpc);
623                 if (rc != 0)
624                         return rc;
625
626                 rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst;
627                 rsrq->rmsn_sid = console_session.ses_id;
628                 break;
629
630         default:
631                 LBUG();
632         }
633
634         return 0;
635 }
636
637 int
638 lstcon_dbgrpc_prep(lstcon_node_t *nd, lstcon_rpc_t **crpc)
639 {
640         srpc_debug_reqst_t *drq;
641         int                 rc;
642
643         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, 0, crpc);
644         if (rc != 0)
645                 return rc;
646
647         drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
648
649         drq->dbg_sid   = console_session.ses_id;
650         drq->dbg_flags = 0;
651         
652         return rc;
653 }
654
655 int
656 lstcon_batrpc_prep(lstcon_node_t *nd, int transop,
657                    lstcon_tsb_hdr_t *tsb, lstcon_rpc_t **crpc)
658 {
659         lstcon_batch_t     *batch;
660         srpc_batch_reqst_t *brq;
661         int                 rc;
662
663         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, 0, crpc);
664         if (rc != 0)
665                 return rc;
666
667         brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst;
668
669         brq->bar_sid     = console_session.ses_id;
670         brq->bar_bid     = tsb->tsb_id;
671         brq->bar_testidx = tsb->tsb_index;
672         brq->bar_opc     = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN :
673                            (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP:
674                             SRPC_BATCH_OPC_QUERY);
675
676         if (transop != LST_TRANS_TSBRUN &&
677             transop != LST_TRANS_TSBSTOP)
678                 return 0;
679
680         LASSERT (tsb->tsb_index == 0);
681
682         batch = (lstcon_batch_t *)tsb;
683         brq->bar_arg = batch->bat_arg;
684         
685         return 0;
686 }
687
688 int
689 lstcon_statrpc_prep(lstcon_node_t *nd, lstcon_rpc_t **crpc)
690 {
691         srpc_stat_reqst_t *srq;
692         int                rc;
693
694         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, 0, crpc);
695         if (rc != 0)
696                 return rc;
697
698         srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst;
699
700         srq->str_sid  = console_session.ses_id;
701         srq->str_type = 0; /* XXX remove it */
702
703         return 0;
704 }
705
706 lnet_process_id_packed_t *
707 lstcon_next_id(int idx, int nkiov, lnet_kiov_t *kiov)
708 {
709         lnet_process_id_packed_t *pid;
710         int                       i;
711
712         i = idx / SFW_ID_PER_PAGE;
713         
714         LASSERT (i < nkiov);
715
716         pid = (lnet_process_id_packed_t *)cfs_page_address(kiov[i].kiov_page);
717
718         return &pid[idx % SFW_ID_PER_PAGE];
719 }
720
721 int
722 lstcon_dstnodes_prep(lstcon_group_t *grp, int idx,
723                      int dist, int span, int nkiov, lnet_kiov_t *kiov)
724 {
725         lnet_process_id_packed_t *pid;
726         lstcon_ndlink_t          *ndl;
727         lstcon_node_t            *nd;
728         int                       start;
729         int                       end;
730         int                       i = 0;
731
732         LASSERT (dist >= 1);
733         LASSERT (span >= 1);
734         LASSERT (grp->grp_nnode >= 1);
735
736         if (span > grp->grp_nnode)
737                 return -EINVAL;
738
739         start = ((idx / dist) * span) % grp->grp_nnode;
740         end   = ((idx / dist) * span + span - 1) % grp->grp_nnode;
741
742         cfs_list_for_each_entry_typed(ndl, &grp->grp_ndl_list,
743                                       lstcon_ndlink_t, ndl_link) {
744                 nd = ndl->ndl_node;
745                 if (i < start) {
746                         i ++;
747                         continue;
748                 }
749
750                 if (i > (end >= start ? end: grp->grp_nnode))
751                         break;
752
753                 pid = lstcon_next_id((i - start), nkiov, kiov);
754                 pid->nid = nd->nd_id.nid;
755                 pid->pid = nd->nd_id.pid;
756                 i++;
757         }
758
759         if (start <= end) /* done */
760                 return 0;
761
762         cfs_list_for_each_entry_typed(ndl, &grp->grp_ndl_list,
763                                       lstcon_ndlink_t, ndl_link) {
764                 if (i > grp->grp_nnode + end)
765                         break;
766
767                 nd = ndl->ndl_node;
768                 pid = lstcon_next_id((i - start), nkiov, kiov);
769                 pid->nid = nd->nd_id.nid;
770                 pid->pid = nd->nd_id.pid;
771                 i++;
772         }
773
774         return 0;
775 }
776
777 int
778 lstcon_pingrpc_prep(lst_test_ping_param_t *param, srpc_test_reqst_t *req)
779 {
780         test_ping_req_t *prq = &req->tsr_u.ping;
781         
782         prq->png_size   = param->png_size;
783         prq->png_flags  = param->png_flags;
784         /* TODO dest */
785         return 0;
786 }
787
788 int
789 lstcon_bulkrpc_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req)
790 {
791         test_bulk_req_t *brq = &req->tsr_u.bulk;
792
793         brq->blk_opc    = param->blk_opc;
794         brq->blk_npg    = (param->blk_size + CFS_PAGE_SIZE - 1) / CFS_PAGE_SIZE;
795         brq->blk_flags  = param->blk_flags;
796
797         return 0;
798 }
799
800 int
801 lstcon_testrpc_prep(lstcon_node_t *nd, int transop,
802                     lstcon_test_t *test, lstcon_rpc_t **crpc)
803 {
804         lstcon_group_t    *sgrp = test->tes_src_grp;
805         lstcon_group_t    *dgrp = test->tes_dst_grp;
806         srpc_test_reqst_t *trq;
807         srpc_bulk_t       *bulk;
808         int                i;
809         int                n  = 0;
810         int                rc = 0;
811
812         if (transop == LST_TRANS_TSBCLIADD)
813                 n = sfw_id_pages(test->tes_span);
814
815         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, n, crpc);
816         if (rc != 0) 
817                 return rc;
818
819         trq  = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst;
820
821         if (transop == LST_TRANS_TSBSRVADD) {
822                 int ndist = (sgrp->grp_nnode + test->tes_dist - 1) / test->tes_dist;
823                 int nspan = (dgrp->grp_nnode + test->tes_span - 1) / test->tes_span;
824                 int nmax = (ndist + nspan - 1) / nspan;
825
826                 trq->tsr_ndest = 0;
827                 trq->tsr_loop  = nmax * test->tes_dist * test->tes_concur;
828
829         } else {
830                 bulk = &(*crpc)->crp_rpc->crpc_bulk;
831
832                 for (i = 0; i < n; i++) {
833                         bulk->bk_iovs[i].kiov_offset = 0;
834                         bulk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
835                         bulk->bk_iovs[i].kiov_page   = cfs_alloc_page(CFS_ALLOC_STD);
836
837                         if (bulk->bk_iovs[i].kiov_page != NULL) 
838                                 continue;
839
840                         lstcon_rpc_put(*crpc);
841                         return -ENOMEM;
842                 }
843
844                 bulk->bk_sink = 0;
845
846                 LASSERT (transop == LST_TRANS_TSBCLIADD);
847
848                 rc = lstcon_dstnodes_prep(test->tes_dst_grp,
849                                           test->tes_cliidx++, test->tes_dist,
850                                           test->tes_span, n, &bulk->bk_iovs[0]);
851                 if (rc != 0) {
852                         lstcon_rpc_put(*crpc);
853                         return rc;
854                 }
855
856                 trq->tsr_ndest = test->tes_span;
857                 trq->tsr_loop  = test->tes_loop;
858         } 
859
860         trq->tsr_sid        = console_session.ses_id;
861         trq->tsr_bid        = test->tes_hdr.tsb_id;
862         trq->tsr_concur     = test->tes_concur;
863         trq->tsr_is_client  = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0;
864         trq->tsr_stop_onerr = !!test->tes_stop_onerr;
865
866         switch (test->tes_type) {
867         case LST_TEST_PING:
868                 trq->tsr_service = SRPC_SERVICE_PING;
869                 rc = lstcon_pingrpc_prep((lst_test_ping_param_t *)&test->tes_param[0], trq);
870                 break;
871         case LST_TEST_BULK:
872                 trq->tsr_service = SRPC_SERVICE_BRW;
873                 rc = lstcon_bulkrpc_prep((lst_test_bulk_param_t *)&test->tes_param[0], trq);
874                 break;
875         default:
876                 LBUG();
877                 break;
878         }
879
880         return rc;
881 }
882
883 void
884 lstcon_rpc_stat_reply(int transop, srpc_msg_t *msg,
885                       lstcon_node_t *nd, lstcon_trans_stat_t *stat)
886 {
887         srpc_mksn_reply_t  *mksn_rep;
888         srpc_rmsn_reply_t  *rmsn_rep;
889         srpc_debug_reply_t *dbg_rep;
890         srpc_batch_reply_t *bat_rep;
891         srpc_test_reply_t  *test_rep;
892         srpc_stat_reply_t  *stat_rep;
893         int                 rc = 0;
894
895         switch (transop) {
896         case LST_TRANS_SESNEW:
897                 mksn_rep = &msg->msg_body.mksn_reply;
898
899                 if (mksn_rep->mksn_status == 0) {
900                         lstcon_sesop_stat_success(stat, 1);
901                         /* session timeout on remote node */
902                         nd->nd_timeout = mksn_rep->mksn_timeout;
903                         return;
904                 }
905
906                 LASSERT (mksn_rep->mksn_status == EBUSY ||
907                          mksn_rep->mksn_status == EINVAL);
908
909                 lstcon_sesop_stat_failure(stat, 1);
910                 rc = mksn_rep->mksn_status;
911                 break;
912
913         case LST_TRANS_SESEND:
914                 rmsn_rep = &msg->msg_body.rmsn_reply;
915                 /* ESRCH is not an error for end session */
916                 if (rmsn_rep->rmsn_status == 0 ||
917                     rmsn_rep->rmsn_status == ESRCH) {
918                         lstcon_sesop_stat_success(stat, 1);
919                         return;
920                 }
921
922                 LASSERT (rmsn_rep->rmsn_status == EBUSY ||
923                          rmsn_rep->rmsn_status == EINVAL);
924
925                 lstcon_sesop_stat_failure(stat, 1);
926                 rc = rmsn_rep->rmsn_status;
927                 break;
928
929         case LST_TRANS_SESQRY:
930         case LST_TRANS_SESPING:
931                 dbg_rep = &msg->msg_body.dbg_reply;
932
933                 if (dbg_rep->dbg_status == ESRCH) {
934                         lstcon_sesqry_stat_unknown(stat, 1);
935                         return;
936                 } 
937
938                 LASSERT (dbg_rep->dbg_status == 0);
939
940                 if (lstcon_session_match(dbg_rep->dbg_sid))
941                         lstcon_sesqry_stat_active(stat, 1);
942                 else
943                         lstcon_sesqry_stat_busy(stat, 1);
944                 return;
945
946         case LST_TRANS_TSBRUN:
947         case LST_TRANS_TSBSTOP:
948                 bat_rep = &msg->msg_body.bat_reply;
949
950                 if (bat_rep->bar_status == 0) {
951                         lstcon_tsbop_stat_success(stat, 1);
952                         return;
953                 }
954
955                 if (bat_rep->bar_status == EPERM && 
956                     transop == LST_TRANS_TSBSTOP) {
957                         lstcon_tsbop_stat_success(stat, 1);
958                         return;
959                 }
960
961                 lstcon_tsbop_stat_failure(stat, 1);
962                 rc = bat_rep->bar_status;
963                 break;
964
965         case LST_TRANS_TSBCLIQRY:
966         case LST_TRANS_TSBSRVQRY:
967                 bat_rep = &msg->msg_body.bat_reply;
968
969                 if (bat_rep->bar_active != 0) 
970                         lstcon_tsbqry_stat_run(stat, 1);
971                 else
972                         lstcon_tsbqry_stat_idle(stat, 1);
973
974                 if (bat_rep->bar_status == 0) 
975                         return;
976
977                 lstcon_tsbqry_stat_failure(stat, 1);
978                 rc = bat_rep->bar_status;
979                 break;
980
981         case LST_TRANS_TSBCLIADD:
982         case LST_TRANS_TSBSRVADD:
983                 test_rep = &msg->msg_body.tes_reply;
984
985                 if (test_rep->tsr_status == 0) {
986                         lstcon_tsbop_stat_success(stat, 1);
987                         return;
988                 }
989
990                 lstcon_tsbop_stat_failure(stat, 1);
991                 rc = test_rep->tsr_status;
992                 break;
993
994         case LST_TRANS_STATQRY:
995                 stat_rep = &msg->msg_body.stat_reply;
996
997                 if (stat_rep->str_status == 0) {
998                         lstcon_statqry_stat_success(stat, 1);
999                         return;
1000                 }
1001
1002                 lstcon_statqry_stat_failure(stat, 1);
1003                 rc = stat_rep->str_status;
1004                 break;
1005
1006         default:
1007                 LBUG();
1008         }
1009
1010         if (stat->trs_fwk_errno == 0)
1011                 stat->trs_fwk_errno = rc;
1012
1013         return;
1014 }
1015
1016 int
1017 lstcon_rpc_trans_ndlist(cfs_list_t *ndlist,
1018                         cfs_list_t *translist, int transop,
1019                         void *arg, lstcon_rpc_cond_func_t condition,
1020                         lstcon_rpc_trans_t **transpp)
1021 {
1022         lstcon_rpc_trans_t *trans;
1023         lstcon_ndlink_t    *ndl;
1024         lstcon_node_t      *nd;
1025         lstcon_rpc_t       *rpc;
1026         int                 rc;
1027
1028         /* Creating session RPG for list of nodes */
1029
1030         rc = lstcon_rpc_trans_prep(translist, transop, &trans);
1031         if (rc != 0) {
1032                 CERROR("Can't create transaction %d: %d\n", transop, rc);
1033                 return rc;
1034         }
1035
1036         cfs_list_for_each_entry_typed(ndl, ndlist, lstcon_ndlink_t, ndl_link) {
1037                 rc = condition == NULL ? 1 :
1038                      condition(transop, ndl->ndl_node, arg);
1039
1040                 if (rc == 0)
1041                         continue;
1042
1043                 if (rc < 0) {
1044                         CDEBUG(D_NET, "Condition error while creating RPC "
1045                                       " for transaction %d: %d\n", transop, rc);
1046                         break;
1047                 }
1048
1049                 nd = ndl->ndl_node;
1050
1051                 switch (transop) {
1052                 case LST_TRANS_SESNEW:
1053                 case LST_TRANS_SESEND:
1054                         rc = lstcon_sesrpc_prep(nd, transop, &rpc);
1055                         break;
1056                 case LST_TRANS_SESQRY:
1057                 case LST_TRANS_SESPING:
1058                         rc = lstcon_dbgrpc_prep(nd, &rpc);
1059                         break;
1060                 case LST_TRANS_TSBCLIADD:
1061                 case LST_TRANS_TSBSRVADD:
1062                         rc = lstcon_testrpc_prep(nd, transop,
1063                                                  (lstcon_test_t *)arg, &rpc);
1064                         break;
1065                 case LST_TRANS_TSBRUN:
1066                 case LST_TRANS_TSBSTOP:
1067                 case LST_TRANS_TSBCLIQRY:
1068                 case LST_TRANS_TSBSRVQRY:
1069                         rc = lstcon_batrpc_prep(nd, transop,
1070                                                 (lstcon_tsb_hdr_t *)arg, &rpc);
1071                         break;
1072                 case LST_TRANS_STATQRY:
1073                         rc = lstcon_statrpc_prep(nd, &rpc);
1074                         break;
1075                 default:
1076                         rc = -EINVAL;
1077                         break;
1078                 }
1079
1080                 if (rc != 0) {
1081                         CERROR("Failed to create RPC for transaction %s: %d\n",
1082                                lstcon_rpc_trans_name(transop), rc);
1083                         break;
1084                 }
1085                                 
1086                 lstcon_rpc_trans_addreq(trans, rpc);
1087         }
1088
1089         if (rc == 0) {
1090                 *transpp = trans;
1091                 return 0;
1092         }
1093
1094         lstcon_rpc_trans_destroy(trans);
1095
1096         return rc;
1097 }
1098
1099 void
1100 lstcon_rpc_pinger(void *arg)
1101 {
1102         stt_timer_t        *ptimer = (stt_timer_t *)arg;
1103         lstcon_rpc_trans_t *trans;
1104         lstcon_rpc_t       *crpc;
1105         srpc_msg_t         *rep;
1106         srpc_debug_reqst_t *drq;
1107         lstcon_ndlink_t    *ndl;
1108         lstcon_node_t      *nd;
1109         time_t              intv;
1110         int                 count = 0;
1111         int                 rc;
1112
1113         /* RPC pinger is a special case of transaction,
1114          * it's called by timer at 8 seconds interval.
1115          */
1116         cfs_mutex_lock(&console_session.ses_mutex);
1117
1118         if (console_session.ses_shutdown || console_session.ses_expired) {
1119                 cfs_mutex_unlock(&console_session.ses_mutex);
1120                 return;
1121         }
1122
1123         if (!console_session.ses_expired &&
1124             cfs_time_current_sec() - console_session.ses_laststamp >
1125             (time_t)console_session.ses_timeout)
1126                 console_session.ses_expired = 1;
1127
1128         trans = console_session.ses_ping;
1129
1130         LASSERT (trans != NULL);
1131
1132         cfs_list_for_each_entry_typed(ndl, &console_session.ses_ndl_list,
1133                                       lstcon_ndlink_t, ndl_link) {
1134                 nd = ndl->ndl_node;
1135
1136                 if (console_session.ses_expired) {
1137                         /* idle console, end session on all nodes */
1138                         if (nd->nd_state != LST_NODE_ACTIVE)
1139                                 continue;
1140
1141                         rc = lstcon_sesrpc_prep(nd, LST_TRANS_SESEND, &crpc);
1142                         if (rc != 0) {
1143                                 CERROR("Out of memory\n");
1144                                 break;
1145                         }
1146
1147                         lstcon_rpc_trans_addreq(trans, crpc);
1148                         lstcon_rpc_post(crpc);
1149
1150                         continue;
1151                 }
1152
1153                 crpc = &nd->nd_ping;
1154
1155                 if (crpc->crp_rpc != NULL) {
1156                         LASSERT (crpc->crp_trans == trans);
1157                         LASSERT (!cfs_list_empty(&crpc->crp_link));
1158
1159                         cfs_spin_lock(&crpc->crp_rpc->crpc_lock);
1160
1161                         LASSERT (crpc->crp_posted);
1162
1163                         if (!crpc->crp_finished) {
1164                                 /* in flight */
1165                                 cfs_spin_unlock(&crpc->crp_rpc->crpc_lock);
1166                                 continue;
1167                         }
1168
1169                         cfs_spin_unlock(&crpc->crp_rpc->crpc_lock);
1170
1171                         lstcon_rpc_get_reply(crpc, &rep);
1172
1173                         cfs_list_del_init(&crpc->crp_link);
1174                 
1175                         lstcon_rpc_put(crpc);
1176                 }
1177
1178                 if (nd->nd_state != LST_NODE_ACTIVE)
1179                         continue;
1180
1181                 intv = cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1182                                                      nd->nd_stamp));
1183                 if (intv < (time_t)nd->nd_timeout / 2)
1184                         continue;
1185
1186                 rc = lstcon_rpc_init(nd, SRPC_SERVICE_DEBUG, 0, 0, crpc);
1187                 if (rc != 0) {
1188                         CERROR("Out of memory\n");
1189                         break;
1190                 }
1191
1192                 drq = &crpc->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
1193
1194                 drq->dbg_sid   = console_session.ses_id;
1195                 drq->dbg_flags = 0;
1196
1197                 lstcon_rpc_trans_addreq(trans, crpc);
1198                 lstcon_rpc_post(crpc);
1199
1200                 count ++;
1201         }
1202
1203         if (console_session.ses_expired) {
1204                 cfs_mutex_unlock(&console_session.ses_mutex);
1205                 return;
1206         }
1207
1208         CDEBUG(D_NET, "Ping %d nodes in session\n", count);
1209
1210         ptimer->stt_expires = (cfs_time_t)(cfs_time_current_sec() + LST_PING_INTERVAL);
1211         stt_add_timer(ptimer);
1212
1213         cfs_mutex_unlock(&console_session.ses_mutex);
1214 }
1215
1216 int
1217 lstcon_rpc_pinger_start(void)
1218 {
1219         stt_timer_t    *ptimer;
1220         int             rc;
1221
1222         LASSERT (cfs_list_empty(&console_session.ses_rpc_freelist));
1223         LASSERT (cfs_atomic_read(&console_session.ses_rpc_counter) == 0);
1224
1225         rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING,
1226                                    &console_session.ses_ping);
1227         if (rc != 0) {
1228                 CERROR("Failed to create console pinger\n");
1229                 return rc;
1230         }
1231
1232         ptimer = &console_session.ses_ping_timer;
1233         ptimer->stt_expires = (cfs_time_t)(cfs_time_current_sec() + LST_PING_INTERVAL);
1234
1235         stt_add_timer(ptimer);
1236
1237         return 0;
1238 }
1239
1240 void
1241 lstcon_rpc_pinger_stop(void)
1242 {
1243         LASSERT (console_session.ses_shutdown);
1244
1245         stt_del_timer(&console_session.ses_ping_timer);
1246
1247         lstcon_rpc_trans_abort(console_session.ses_ping, -ESHUTDOWN);
1248         lstcon_rpc_trans_stat(console_session.ses_ping, lstcon_trans_stat());
1249         lstcon_rpc_trans_destroy(console_session.ses_ping);
1250
1251         memset(lstcon_trans_stat(), 0, sizeof(lstcon_trans_stat_t));
1252
1253         console_session.ses_ping = NULL;
1254 }
1255
1256 void
1257 lstcon_rpc_cleanup_wait(void)
1258 {
1259         lstcon_rpc_trans_t *trans;
1260         lstcon_rpc_t       *crpc;
1261         cfs_list_t         *pacer;
1262         cfs_list_t          zlist;
1263
1264         /* Called with hold of global mutex */
1265
1266         LASSERT (console_session.ses_shutdown);
1267
1268         while (!cfs_list_empty(&console_session.ses_trans_list)) { 
1269                 cfs_list_for_each(pacer, &console_session.ses_trans_list) {
1270                         trans = cfs_list_entry(pacer, lstcon_rpc_trans_t,
1271                                                tas_link);
1272
1273                         CDEBUG(D_NET, "Session closed, wakeup transaction %s\n",
1274                                lstcon_rpc_trans_name(trans->tas_opc));
1275
1276                         cfs_waitq_signal(&trans->tas_waitq);
1277                 }
1278
1279                 cfs_mutex_unlock(&console_session.ses_mutex);
1280
1281                 CWARN("Session is shutting down, "
1282                       "waiting for termination of transactions\n");
1283                 cfs_pause(cfs_time_seconds(1));
1284
1285                 cfs_mutex_lock(&console_session.ses_mutex);
1286         }
1287
1288         cfs_spin_lock(&console_session.ses_rpc_lock);
1289
1290         lst_wait_until((cfs_atomic_read(&console_session.ses_rpc_counter) == 0),
1291                        console_session.ses_rpc_lock,
1292                        "Network is not accessable or target is down, "
1293                        "waiting for %d console RPCs to being recycled\n",
1294                        cfs_atomic_read(&console_session.ses_rpc_counter));
1295
1296         cfs_list_add(&zlist, &console_session.ses_rpc_freelist);
1297         cfs_list_del_init(&console_session.ses_rpc_freelist);
1298
1299         cfs_spin_unlock(&console_session.ses_rpc_lock);
1300
1301         while (!cfs_list_empty(&zlist)) {
1302                 crpc = cfs_list_entry(zlist.next, lstcon_rpc_t, crp_link);
1303
1304                 cfs_list_del(&crpc->crp_link);
1305                 LIBCFS_FREE(crpc, sizeof(lstcon_rpc_t));
1306         }
1307 }
1308
1309 int
1310 lstcon_rpc_module_init(void)
1311 {
1312         CFS_INIT_LIST_HEAD(&console_session.ses_ping_timer.stt_list);
1313         console_session.ses_ping_timer.stt_func = lstcon_rpc_pinger;
1314         console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer;
1315
1316         console_session.ses_ping = NULL;
1317
1318         cfs_spin_lock_init(&console_session.ses_rpc_lock);
1319         cfs_atomic_set(&console_session.ses_rpc_counter, 0);
1320         CFS_INIT_LIST_HEAD(&console_session.ses_rpc_freelist);
1321
1322         return 0;
1323 }
1324
1325 void
1326 lstcon_rpc_module_fini(void)
1327 {
1328         LASSERT (cfs_list_empty(&console_session.ses_rpc_freelist));
1329         LASSERT (cfs_atomic_read(&console_session.ses_rpc_counter) == 0);
1330 }
1331
1332 #endif