Whamcloud - gitweb
b=22771 add changelog entry
[fs/lustre-release.git] / lnet / selftest / conrpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/conctl.c
37  *
38  * Console framework rpcs
39  *
40  * Author: Liang Zhen <liangzhen@clusterfs.com>
41  */
42
43 #ifdef __KERNEL__
44
45 #include <libcfs/libcfs.h>
46 #include <lnet/lib-lnet.h>
47 #include "timer.h"
48 #include "conrpc.h"
49 #include "console.h"
50
51 void lstcon_rpc_stat_reply(int, srpc_msg_t *,
52                            lstcon_node_t *, lstcon_trans_stat_t *);
53
54 static void
55 lstcon_rpc_done(srpc_client_rpc_t *rpc)
56 {
57         lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv;
58
59         LASSERT (crpc != NULL && rpc == crpc->crp_rpc);
60         LASSERT (crpc->crp_posted && !crpc->crp_finished);
61
62         spin_lock(&rpc->crpc_lock);
63
64         if (crpc->crp_trans == NULL) {
65                 /* Orphan RPC is not in any transaction, 
66                  * I'm just a poor body and nobody loves me */
67                 spin_unlock(&rpc->crpc_lock);
68
69                 /* release it */
70                 lstcon_rpc_put(crpc);
71                 return;
72         }
73
74         /* not an orphan RPC */
75         crpc->crp_finished = 1;
76
77         if (crpc->crp_stamp == 0) {
78                 /* not aborted */
79                 LASSERT (crpc->crp_status == 0);
80
81                 crpc->crp_stamp  = cfs_time_current();
82                 crpc->crp_status = rpc->crpc_status;
83         }
84
85         /* wakeup (transaction)thread if I'm the last RPC in the transaction */
86         if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining))
87                 cfs_waitq_signal(&crpc->crp_trans->tas_waitq);
88
89         spin_unlock(&rpc->crpc_lock);
90 }
91
92 int
93 lstcon_rpc_init(lstcon_node_t *nd, int service,
94                 int npg, int cached, lstcon_rpc_t *crpc)
95 {
96
97         crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service, 
98                                        npg, npg * CFS_PAGE_SIZE,
99                                        lstcon_rpc_done, (void *)crpc);
100         if (crpc->crp_rpc == NULL)
101                 return -ENOMEM;
102
103         crpc->crp_trans    = NULL;
104         crpc->crp_node     = nd;
105         crpc->crp_posted   = 0;
106         crpc->crp_finished = 0;
107         crpc->crp_unpacked = 0;
108         crpc->crp_status   = 0;
109         crpc->crp_stamp    = 0;
110         crpc->crp_static   = !cached;
111         CFS_INIT_LIST_HEAD(&crpc->crp_link);
112
113         atomic_inc(&console_session.ses_rpc_counter);
114
115         return 0;
116 }
117
118 int
119 lstcon_rpc_prep(lstcon_node_t *nd, int service,
120                 int npg, lstcon_rpc_t **crpcpp)
121 {
122         lstcon_rpc_t  *crpc = NULL;
123         int            rc;
124
125         spin_lock(&console_session.ses_rpc_lock);
126
127         if (!list_empty(&console_session.ses_rpc_freelist)) {
128                 crpc = list_entry(console_session.ses_rpc_freelist.next,
129                                   lstcon_rpc_t, crp_link);
130                 list_del_init(&crpc->crp_link);
131         }
132
133         spin_unlock(&console_session.ses_rpc_lock);
134
135         if (crpc == NULL) {
136                 LIBCFS_ALLOC(crpc, sizeof(*crpc));
137                 if (crpc == NULL)
138                         return -ENOMEM;
139         }
140
141         rc = lstcon_rpc_init(nd, service, npg, 1, crpc);
142         if (rc == 0) {
143                 *crpcpp = crpc;
144                 return 0;
145         }
146
147         LIBCFS_FREE(crpc, sizeof(*crpc));
148
149         return rc;
150 }
151
152 void
153 lstcon_rpc_put(lstcon_rpc_t *crpc)
154 {
155         srpc_bulk_t *bulk = &crpc->crp_rpc->crpc_bulk;
156         int          i;
157
158         LASSERT (list_empty(&crpc->crp_link));
159
160         for (i = 0; i < bulk->bk_niov; i++) {
161                 if (bulk->bk_iovs[i].kiov_page == NULL)
162                         continue;
163
164                 cfs_free_page(bulk->bk_iovs[i].kiov_page);
165         }
166
167         srpc_client_rpc_decref(crpc->crp_rpc);
168
169         if (crpc->crp_static) {
170                 /* Static RPC, not allocated */
171                 memset(crpc, 0, sizeof(*crpc));
172                 crpc->crp_static = 1;
173
174         } else {
175                 spin_lock(&console_session.ses_rpc_lock);
176
177                 list_add(&crpc->crp_link, &console_session.ses_rpc_freelist);
178
179                 spin_unlock(&console_session.ses_rpc_lock);
180         }
181
182         /* RPC is not alive now */
183         atomic_dec(&console_session.ses_rpc_counter);
184 }
185
186 void
187 lstcon_rpc_post(lstcon_rpc_t *crpc)
188 {
189         lstcon_rpc_trans_t *trans = crpc->crp_trans;
190
191         LASSERT (trans != NULL);
192
193         atomic_inc(&trans->tas_remaining);
194         crpc->crp_posted = 1;
195
196         sfw_post_rpc(crpc->crp_rpc);
197 }
198
199 static char *
200 lstcon_rpc_trans_name(int transop)
201 {
202         if (transop == LST_TRANS_SESNEW)
203                 return "SESNEW";
204
205         if (transop == LST_TRANS_SESEND)
206                 return "SESEND";
207
208         if (transop == LST_TRANS_SESQRY)
209                 return "SESQRY";
210
211         if (transop == LST_TRANS_SESPING)
212                 return "SESPING";
213
214         if (transop == LST_TRANS_TSBCLIADD)
215                 return "TSBCLIADD";
216
217         if (transop == LST_TRANS_TSBSRVADD)
218                 return "TSBSRVADD";
219
220         if (transop == LST_TRANS_TSBRUN)
221                 return "TSBRUN";
222
223         if (transop == LST_TRANS_TSBSTOP)
224                 return "TSBSTOP";
225
226         if (transop == LST_TRANS_TSBCLIQRY)
227                 return "TSBCLIQRY";
228
229         if (transop == LST_TRANS_TSBSRVQRY)
230                 return "TSBSRVQRY";
231
232         if (transop == LST_TRANS_STATQRY)
233                 return "STATQRY";
234
235         return "Unknown";
236 }
237
238 int
239 lstcon_rpc_trans_prep(struct list_head *translist,
240                       int transop, lstcon_rpc_trans_t **transpp)
241 {
242         lstcon_rpc_trans_t *trans;
243
244         if (translist != NULL) {
245                 list_for_each_entry(trans, translist, tas_link) {
246                         /* Can't enqueue two private transaction on
247                          * the same object */
248                         if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE)
249                                 return -EPERM;
250                 }
251         }
252
253         /* create a trans group */
254         LIBCFS_ALLOC(trans, sizeof(*trans));
255         if (trans == NULL)
256                 return -ENOMEM;
257         
258         trans->tas_opc = transop;
259
260         if (translist == NULL)       
261                 CFS_INIT_LIST_HEAD(&trans->tas_olink);
262         else
263                 list_add_tail(&trans->tas_olink, translist);
264
265         list_add_tail(&trans->tas_link, &console_session.ses_trans_list);
266
267         CFS_INIT_LIST_HEAD(&trans->tas_rpcs_list);
268         atomic_set(&trans->tas_remaining, 0);
269         cfs_waitq_init(&trans->tas_waitq);
270
271         *transpp = trans;
272
273         return 0;
274 }
275
276 void
277 lstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *crpc)
278 {
279         list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list);
280         crpc->crp_trans = trans;
281 }
282
283 void
284 lstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error)
285 {
286         srpc_client_rpc_t *rpc;
287         lstcon_rpc_t      *crpc;
288         lstcon_node_t     *nd;
289
290         list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) {
291                 rpc = crpc->crp_rpc;
292
293                 spin_lock(&rpc->crpc_lock);
294
295                 if (!crpc->crp_posted || /* not posted */
296                     crpc->crp_stamp != 0) { /* rpc done or aborted already */
297                         if (crpc->crp_stamp == 0) {
298                                 crpc->crp_stamp = cfs_time_current();
299                                 crpc->crp_status = -EINTR;
300                         }
301                         spin_unlock(&rpc->crpc_lock);
302                         continue;
303                 }
304
305                 crpc->crp_stamp  = cfs_time_current();
306                 crpc->crp_status = error;
307
308                 spin_unlock(&rpc->crpc_lock);
309
310                 sfw_abort_rpc(rpc);
311
312                 if  (error != ETIMEDOUT)
313                         continue;
314
315                 nd = crpc->crp_node;
316                 if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
317                         continue;
318
319                 nd->nd_stamp = crpc->crp_stamp;
320                 nd->nd_state = LST_NODE_DOWN;
321         }
322 }
323
324 static int
325 lstcon_rpc_trans_check(lstcon_rpc_trans_t *trans)
326 {
327         if (console_session.ses_shutdown &&
328             !list_empty(&trans->tas_olink)) /* It's not an end session RPC */
329                 return 1;
330
331         return (atomic_read(&trans->tas_remaining) == 0) ? 1: 0;
332 }
333
334 int
335 lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout)
336 {
337         lstcon_rpc_t  *crpc;
338         int            rc;
339
340         if (list_empty(&trans->tas_rpcs_list))
341                 return 0;
342
343         if (timeout < LST_TRANS_MIN_TIMEOUT)
344                 timeout = LST_TRANS_MIN_TIMEOUT;
345
346         CDEBUG(D_NET, "Transaction %s started\n",
347                lstcon_rpc_trans_name(trans->tas_opc));
348
349         /* post all requests */
350         list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) {
351                 LASSERT (!crpc->crp_posted);
352
353                 lstcon_rpc_post(crpc);
354         }
355
356         mutex_up(&console_session.ses_mutex);
357
358         rc = cfs_waitq_wait_event_interruptible_timeout(trans->tas_waitq,
359                                               lstcon_rpc_trans_check(trans),
360                                               timeout * HZ);
361
362         rc = (rc > 0)? 0: ((rc < 0)? -EINTR: -ETIMEDOUT);
363
364         mutex_down(&console_session.ses_mutex);
365
366         if (console_session.ses_shutdown)
367                 rc = -ESHUTDOWN;
368
369         if (rc != 0 || atomic_read(&trans->tas_remaining) != 0) {
370                 /* treat short timeout as canceled */
371                 if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2)
372                         rc = -EINTR;
373
374                 lstcon_rpc_trans_abort(trans, rc);
375         }
376
377         CDEBUG(D_NET, "Transaction %s stopped: %d\n",
378                lstcon_rpc_trans_name(trans->tas_opc), rc);
379
380         lstcon_rpc_trans_stat(trans, lstcon_trans_stat());
381
382         return rc;
383 }
384
385 int
386 lstcon_rpc_get_reply(lstcon_rpc_t *crpc, srpc_msg_t **msgpp)
387 {
388         lstcon_node_t        *nd  = crpc->crp_node;
389         srpc_client_rpc_t    *rpc = crpc->crp_rpc;
390         srpc_generic_reply_t *rep;
391
392         LASSERT (nd != NULL && rpc != NULL);
393         LASSERT (crpc->crp_stamp != 0);
394
395         if (crpc->crp_status != 0) {
396                 *msgpp = NULL;
397                 return crpc->crp_status;
398         }
399
400         *msgpp = &rpc->crpc_replymsg;
401         if (!crpc->crp_unpacked) {
402                 sfw_unpack_message(*msgpp);
403                 crpc->crp_unpacked = 1;
404         }
405        
406         if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
407                 return 0;
408
409         nd->nd_stamp = crpc->crp_stamp;
410         rep = &(*msgpp)->msg_body.reply;
411
412         if (rep->sid.ses_nid == LNET_NID_ANY)
413                 nd->nd_state = LST_NODE_UNKNOWN;
414         else if (lstcon_session_match(rep->sid))
415                 nd->nd_state = LST_NODE_ACTIVE;
416         else
417                 nd->nd_state = LST_NODE_BUSY;
418
419         return 0;
420 }
421
422 void
423 lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat)
424 {
425         lstcon_rpc_t      *crpc;
426         srpc_client_rpc_t *rpc;
427         srpc_msg_t        *rep;
428         int                error;
429
430         LASSERT (stat != NULL);
431
432         memset(stat, 0, sizeof(*stat));
433
434         list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
435                 lstcon_rpc_stat_total(stat, 1);
436
437                 rpc = crpc->crp_rpc;
438
439                 LASSERT (crpc->crp_stamp != 0);
440
441                 error = lstcon_rpc_get_reply(crpc, &rep);
442                 if (error != 0) {
443                         lstcon_rpc_stat_failure(stat, 1);
444                         if (stat->trs_rpc_errno == 0)
445                                 stat->trs_rpc_errno = -error;
446
447                         continue;
448                 }
449
450                 lstcon_rpc_stat_success(stat, 1);
451
452                 lstcon_rpc_stat_reply(trans->tas_opc, rep,
453                                       crpc->crp_node, stat);
454         }
455
456         CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, "
457                       "RPC error(%d), Framework error(%d)\n",
458                lstcon_rpc_trans_name(trans->tas_opc),
459                lstcon_rpc_stat_success(stat, 0),
460                lstcon_rpc_stat_failure(stat, 0),
461                lstcon_rpc_stat_total(stat, 0),
462                stat->trs_rpc_errno, stat->trs_fwk_errno);
463
464         return;
465 }
466
467 int
468 lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans,
469                              struct list_head *head_up,
470                              lstcon_rpc_readent_func_t readent)
471 {
472         struct list_head      tmp;
473         struct list_head     *next;
474         lstcon_rpc_ent_t     *ent;
475         srpc_generic_reply_t *rep;
476         srpc_client_rpc_t    *rpc;
477         lstcon_rpc_t         *crpc;
478         srpc_msg_t           *msg;
479         lstcon_node_t        *nd;
480         cfs_duration_t        dur;
481         struct timeval        tv;
482         int                   error;
483
484         LASSERT (head_up != NULL);
485
486         next = head_up;
487
488         list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
489                 if (copy_from_user(&tmp, next, sizeof(struct list_head)))
490                         return -EFAULT;
491
492                 if (tmp.next == head_up)
493                         return 0;
494
495                 next = tmp.next;
496
497                 ent = list_entry(next, lstcon_rpc_ent_t, rpe_link);
498
499                 rpc = crpc->crp_rpc;
500
501                 LASSERT (crpc->crp_stamp != 0);
502
503                 error = lstcon_rpc_get_reply(crpc, &msg);
504
505                 nd = crpc->crp_node;
506
507                 dur = cfs_time_sub(crpc->crp_stamp,
508                                    console_session.ses_id.ses_stamp);
509                 cfs_duration_usec(dur, &tv);
510
511                 if (copy_to_user(&ent->rpe_peer,
512                                  &nd->nd_id, sizeof(lnet_process_id_t)) ||
513                     copy_to_user(&ent->rpe_stamp, &tv, sizeof(tv)) ||
514                     copy_to_user(&ent->rpe_state,
515                                  &nd->nd_state, sizeof(nd->nd_state)) ||
516                     copy_to_user(&ent->rpe_rpc_errno, &error, sizeof(error)))
517                         return -EFAULT;
518
519                 if (error != 0)
520                         continue;
521
522                 /* RPC is done */
523                 rep = (srpc_generic_reply_t *)&msg->msg_body.reply;
524
525                 if (copy_to_user(&ent->rpe_sid,
526                                  &rep->sid, sizeof(lst_sid_t)) ||
527                     copy_to_user(&ent->rpe_fwk_errno,
528                                  &rep->status, sizeof(rep->status)))
529                         return -EFAULT;
530
531                 if (readent == NULL)
532                         continue;
533
534                 if ((error = readent(trans->tas_opc, msg, ent)) != 0)
535                         return error;
536         }
537
538         return 0;
539 }
540
541 void
542 lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans)
543 {
544         srpc_client_rpc_t *rpc;
545         lstcon_rpc_t      *crpc;
546         lstcon_rpc_t      *tmp;
547         int                count = 0;
548         
549         list_for_each_entry_safe(crpc, tmp,
550                                  &trans->tas_rpcs_list, crp_link) {
551                 rpc = crpc->crp_rpc;
552
553                 spin_lock(&rpc->crpc_lock);
554
555                 /* free it if not posted or finished already */
556                 if (!crpc->crp_posted || crpc->crp_finished) {
557                         spin_unlock(&rpc->crpc_lock);
558
559                         list_del_init(&crpc->crp_link);
560                         lstcon_rpc_put(crpc);
561
562                         continue;
563                 }
564
565                 /* rpcs can be still not callbacked (even LNetMDUnlink is called)
566                  * because huge timeout for inaccessible network, don't make
567                  * user wait for them, just abandon them, they will be recycled 
568                  * in callback */
569
570                 LASSERT (crpc->crp_status != 0);
571
572                 crpc->crp_node  = NULL;
573                 crpc->crp_trans = NULL;
574                 list_del_init(&crpc->crp_link);
575                 count ++;
576
577                 spin_unlock(&rpc->crpc_lock);
578
579                 atomic_dec(&trans->tas_remaining);
580         }
581
582         LASSERT (atomic_read(&trans->tas_remaining) == 0);
583
584         list_del(&trans->tas_link);
585         if (!list_empty(&trans->tas_olink))
586                 list_del(&trans->tas_olink);
587
588         CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n",
589                lstcon_rpc_trans_name(trans->tas_opc), count);
590
591         LIBCFS_FREE(trans, sizeof(*trans));
592
593         return;
594 }
595
596 int
597 lstcon_sesrpc_prep(lstcon_node_t *nd, int transop, lstcon_rpc_t **crpc)
598 {
599         srpc_mksn_reqst_t *msrq;
600         srpc_rmsn_reqst_t *rsrq;
601         int                rc;
602
603         switch (transop) {
604         case LST_TRANS_SESNEW:
605                 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION, 0, crpc);
606                 if (rc != 0)
607                         return rc;
608
609                 msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst;
610                 msrq->mksn_sid     = console_session.ses_id;
611                 msrq->mksn_force   = console_session.ses_force;
612                 strncpy(msrq->mksn_name, console_session.ses_name,
613                         strlen(console_session.ses_name));
614                 break;
615
616         case LST_TRANS_SESEND:
617                 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION, 0, crpc);
618                 if (rc != 0)
619                         return rc;
620
621                 rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst;
622                 rsrq->rmsn_sid = console_session.ses_id;
623                 break;
624
625         default:
626                 LBUG();
627         }
628
629         return 0;
630 }
631
632 int
633 lstcon_dbgrpc_prep(lstcon_node_t *nd, lstcon_rpc_t **crpc)
634 {
635         srpc_debug_reqst_t *drq;
636         int                 rc;
637
638         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, 0, crpc);
639         if (rc != 0)
640                 return rc;
641
642         drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
643
644         drq->dbg_sid   = console_session.ses_id;
645         drq->dbg_flags = 0;
646         
647         return rc;
648 }
649
650 int
651 lstcon_batrpc_prep(lstcon_node_t *nd, int transop,
652                    lstcon_tsb_hdr_t *tsb, lstcon_rpc_t **crpc)
653 {
654         lstcon_batch_t     *batch;
655         srpc_batch_reqst_t *brq;
656         int                 rc;
657
658         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, 0, crpc);
659         if (rc != 0)
660                 return rc;
661
662         brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst;
663
664         brq->bar_sid     = console_session.ses_id;
665         brq->bar_bid     = tsb->tsb_id;
666         brq->bar_testidx = tsb->tsb_index;
667         brq->bar_opc     = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN :
668                            (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP:
669                             SRPC_BATCH_OPC_QUERY);
670
671         if (transop != LST_TRANS_TSBRUN &&
672             transop != LST_TRANS_TSBSTOP)
673                 return 0;
674
675         LASSERT (tsb->tsb_index == 0);
676
677         batch = (lstcon_batch_t *)tsb;
678         brq->bar_arg = batch->bat_arg;
679         
680         return 0;
681 }
682
683 int
684 lstcon_statrpc_prep(lstcon_node_t *nd, lstcon_rpc_t **crpc)
685 {
686         srpc_stat_reqst_t *srq;
687         int                rc;
688
689         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, 0, crpc);
690         if (rc != 0)
691                 return rc;
692
693         srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst;
694
695         srq->str_sid  = console_session.ses_id;
696         srq->str_type = 0; /* XXX remove it */
697
698         return 0;
699 }
700
701 lnet_process_id_packed_t *
702 lstcon_next_id(int idx, int nkiov, lnet_kiov_t *kiov)
703 {
704         lnet_process_id_packed_t *pid;
705         int                       i;
706
707         i = idx / SFW_ID_PER_PAGE;
708         
709         LASSERT (i < nkiov);
710
711         pid = (lnet_process_id_packed_t *)cfs_page_address(kiov[i].kiov_page);
712
713         return &pid[idx % SFW_ID_PER_PAGE];
714 }
715
716 int
717 lstcon_dstnodes_prep(lstcon_group_t *grp, int idx,
718                      int dist, int span, int nkiov, lnet_kiov_t *kiov)
719 {
720         lnet_process_id_packed_t *pid;
721         lstcon_ndlink_t          *ndl;
722         lstcon_node_t            *nd;
723         int                       start;
724         int                       end;
725         int                       i = 0;
726
727         LASSERT (dist >= 1);
728         LASSERT (span >= 1);
729         LASSERT (grp->grp_nnode >= 1);
730
731         if (span > grp->grp_nnode)
732                 return -EINVAL;
733
734         start = ((idx / dist) * span) % grp->grp_nnode;
735         end   = ((idx / dist) * span + span - 1) % grp->grp_nnode;
736
737         list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
738                 nd = ndl->ndl_node;
739                 if (i < start) {
740                         i ++;
741                         continue;
742                 }
743
744                 if (i > (end >= start ? end: grp->grp_nnode))
745                         break;
746
747                 pid = lstcon_next_id((i - start), nkiov, kiov);
748                 pid->nid = nd->nd_id.nid;
749                 pid->pid = nd->nd_id.pid;
750                 i++;
751         }
752
753         if (start <= end) /* done */
754                 return 0;
755
756         list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
757                 if (i > grp->grp_nnode + end)
758                         break;
759
760                 nd = ndl->ndl_node;
761                 pid = lstcon_next_id((i - start), nkiov, kiov);
762                 pid->nid = nd->nd_id.nid;
763                 pid->pid = nd->nd_id.pid;
764                 i++;
765         }
766
767         return 0;
768 }
769
770 int
771 lstcon_pingrpc_prep(lst_test_ping_param_t *param, srpc_test_reqst_t *req)
772 {
773         test_ping_req_t *prq = &req->tsr_u.ping;
774         
775         prq->png_size   = param->png_size;
776         prq->png_flags  = param->png_flags;
777         /* TODO dest */
778         return 0;
779 }
780
781 int
782 lstcon_bulkrpc_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req)
783 {
784         test_bulk_req_t *brq = &req->tsr_u.bulk;
785
786         brq->blk_opc    = param->blk_opc;
787         brq->blk_npg    = (param->blk_size + CFS_PAGE_SIZE - 1) / CFS_PAGE_SIZE;
788         brq->blk_flags  = param->blk_flags;
789
790         return 0;
791 }
792
793 int
794 lstcon_testrpc_prep(lstcon_node_t *nd, int transop,
795                     lstcon_test_t *test, lstcon_rpc_t **crpc)
796 {
797         lstcon_group_t    *sgrp = test->tes_src_grp;
798         lstcon_group_t    *dgrp = test->tes_dst_grp;
799         srpc_test_reqst_t *trq;
800         srpc_bulk_t       *bulk;
801         int                i;
802         int                n  = 0;
803         int                rc = 0;
804
805         if (transop == LST_TRANS_TSBCLIADD)
806                 n = sfw_id_pages(test->tes_span);
807
808         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, n, crpc);
809         if (rc != 0) 
810                 return rc;
811
812         trq  = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst;
813
814         if (transop == LST_TRANS_TSBSRVADD) {
815                 int ndist = (sgrp->grp_nnode + test->tes_dist - 1) / test->tes_dist;
816                 int nspan = (dgrp->grp_nnode + test->tes_span - 1) / test->tes_span;
817                 int nmax = (ndist + nspan - 1) / nspan;
818
819                 trq->tsr_ndest = 0;
820                 trq->tsr_loop  = nmax * test->tes_dist * test->tes_concur;
821
822         } else {
823                 bulk = &(*crpc)->crp_rpc->crpc_bulk;
824
825                 for (i = 0; i < n; i++) {
826                         bulk->bk_iovs[i].kiov_offset = 0;
827                         bulk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
828                         bulk->bk_iovs[i].kiov_page   = cfs_alloc_page(CFS_ALLOC_STD);
829
830                         if (bulk->bk_iovs[i].kiov_page != NULL) 
831                                 continue;
832
833                         lstcon_rpc_put(*crpc);
834                         return -ENOMEM;
835                 }
836
837                 bulk->bk_sink = 0;
838
839                 LASSERT (transop == LST_TRANS_TSBCLIADD);
840
841                 rc = lstcon_dstnodes_prep(test->tes_dst_grp,
842                                           test->tes_cliidx++, test->tes_dist,
843                                           test->tes_span, n, &bulk->bk_iovs[0]);
844                 if (rc != 0) {
845                         lstcon_rpc_put(*crpc);
846                         return rc;
847                 }
848
849                 trq->tsr_ndest = test->tes_span;
850                 trq->tsr_loop  = test->tes_loop;
851         } 
852
853         trq->tsr_sid        = console_session.ses_id;
854         trq->tsr_bid        = test->tes_hdr.tsb_id;
855         trq->tsr_concur     = test->tes_concur;
856         trq->tsr_is_client  = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0;
857         trq->tsr_stop_onerr = test->tes_stop_onerr;
858
859         switch (test->tes_type) {
860         case LST_TEST_PING:
861                 trq->tsr_service = SRPC_SERVICE_PING;
862                 rc = lstcon_pingrpc_prep((lst_test_ping_param_t *)&test->tes_param[0], trq);
863                 break;
864         case LST_TEST_BULK:
865                 trq->tsr_service = SRPC_SERVICE_BRW;
866                 rc = lstcon_bulkrpc_prep((lst_test_bulk_param_t *)&test->tes_param[0], trq);
867                 break;
868         default:
869                 LBUG();
870                 break;
871         }
872
873         return rc;
874 }
875
876 void
877 lstcon_rpc_stat_reply(int transop, srpc_msg_t *msg,
878                       lstcon_node_t *nd, lstcon_trans_stat_t *stat)
879 {
880         srpc_mksn_reply_t  *mksn_rep;
881         srpc_rmsn_reply_t  *rmsn_rep;
882         srpc_debug_reply_t *dbg_rep;
883         srpc_batch_reply_t *bat_rep;
884         srpc_test_reply_t  *test_rep;
885         srpc_stat_reply_t  *stat_rep;
886         int                 errno = 0;
887
888         switch (transop) {
889         case LST_TRANS_SESNEW:
890                 mksn_rep = &msg->msg_body.mksn_reply;
891
892                 if (mksn_rep->mksn_status == 0) {
893                         lstcon_sesop_stat_success(stat, 1);
894                         /* session timeout on remote node */
895                         nd->nd_timeout = mksn_rep->mksn_timeout;
896                         return;
897                 }
898
899                 LASSERT (mksn_rep->mksn_status == EBUSY ||
900                          mksn_rep->mksn_status == EINVAL);
901
902                 lstcon_sesop_stat_failure(stat, 1);
903                 errno = mksn_rep->mksn_status;
904                 break;
905
906         case LST_TRANS_SESEND:
907                 rmsn_rep = &msg->msg_body.rmsn_reply;
908                 /* ESRCH is not an error for end session */
909                 if (rmsn_rep->rmsn_status == 0 ||
910                     rmsn_rep->rmsn_status == ESRCH) {
911                         lstcon_sesop_stat_success(stat, 1);
912                         return;
913                 }
914
915                 LASSERT (rmsn_rep->rmsn_status == EBUSY ||
916                          rmsn_rep->rmsn_status == EINVAL);
917
918                 lstcon_sesop_stat_failure(stat, 1);
919                 errno = rmsn_rep->rmsn_status;
920                 break;
921
922         case LST_TRANS_SESQRY:
923         case LST_TRANS_SESPING:
924                 dbg_rep = &msg->msg_body.dbg_reply;
925
926                 if (dbg_rep->dbg_status == ESRCH) {
927                         lstcon_sesqry_stat_unknown(stat, 1);
928                         return;
929                 } 
930
931                 LASSERT (dbg_rep->dbg_status == 0);
932
933                 if (lstcon_session_match(dbg_rep->dbg_sid))
934                         lstcon_sesqry_stat_active(stat, 1);
935                 else
936                         lstcon_sesqry_stat_busy(stat, 1);
937                 return;
938
939         case LST_TRANS_TSBRUN:
940         case LST_TRANS_TSBSTOP:
941                 bat_rep = &msg->msg_body.bat_reply;
942
943                 if (bat_rep->bar_status == 0) {
944                         lstcon_tsbop_stat_success(stat, 1);
945                         return;
946                 }
947
948                 if (bat_rep->bar_status == EPERM && 
949                     transop == LST_TRANS_TSBSTOP) {
950                         lstcon_tsbop_stat_success(stat, 1);
951                         return;
952                 }
953
954                 lstcon_tsbop_stat_failure(stat, 1);
955                 errno = bat_rep->bar_status;
956                 break;
957
958         case LST_TRANS_TSBCLIQRY:
959         case LST_TRANS_TSBSRVQRY:
960                 bat_rep = &msg->msg_body.bat_reply;
961
962                 if (bat_rep->bar_active != 0) 
963                         lstcon_tsbqry_stat_run(stat, 1);
964                 else
965                         lstcon_tsbqry_stat_idle(stat, 1);
966
967                 if (bat_rep->bar_status == 0) 
968                         return;
969
970                 lstcon_tsbqry_stat_failure(stat, 1);
971                 errno = bat_rep->bar_status;
972                 break;
973
974         case LST_TRANS_TSBCLIADD:
975         case LST_TRANS_TSBSRVADD:
976                 test_rep = &msg->msg_body.tes_reply;
977
978                 if (test_rep->tsr_status == 0) {
979                         lstcon_tsbop_stat_success(stat, 1);
980                         return;
981                 }
982
983                 lstcon_tsbop_stat_failure(stat, 1);
984                 errno = test_rep->tsr_status;
985                 break;
986
987         case LST_TRANS_STATQRY:
988                 stat_rep = &msg->msg_body.stat_reply;
989
990                 if (stat_rep->str_status == 0) {
991                         lstcon_statqry_stat_success(stat, 1);
992                         return;
993                 }
994
995                 lstcon_statqry_stat_failure(stat, 1);
996                 errno = stat_rep->str_status;
997                 break;
998
999         default:
1000                 LBUG();
1001         }
1002
1003         if (stat->trs_fwk_errno == 0)
1004                 stat->trs_fwk_errno = errno;
1005
1006         return;
1007 }
1008
1009 int
1010 lstcon_rpc_trans_ndlist(struct list_head *ndlist,
1011                         struct list_head *translist, int transop,
1012                         void *arg, lstcon_rpc_cond_func_t condition,
1013                         lstcon_rpc_trans_t **transpp)
1014 {
1015         lstcon_rpc_trans_t *trans;
1016         lstcon_ndlink_t    *ndl;
1017         lstcon_node_t      *nd;
1018         lstcon_rpc_t       *rpc;
1019         int                 rc;
1020
1021         /* Creating session RPG for list of nodes */
1022
1023         rc = lstcon_rpc_trans_prep(translist, transop, &trans);
1024         if (rc != 0) {
1025                 CERROR("Can't create transaction %d: %d\n", transop, rc);
1026                 return rc;
1027         }
1028
1029         list_for_each_entry(ndl, ndlist, ndl_link) {
1030                 rc = condition == NULL ? 1 :
1031                      condition(transop, ndl->ndl_node, arg);
1032
1033                 if (rc == 0)
1034                         continue;
1035
1036                 if (rc < 0) {
1037                         CDEBUG(D_NET, "Condition error while creating RPC "
1038                                       " for transaction %d: %d\n", transop, rc);
1039                         break;
1040                 }
1041
1042                 nd = ndl->ndl_node;
1043
1044                 switch (transop) {
1045                 case LST_TRANS_SESNEW:
1046                 case LST_TRANS_SESEND:
1047                         rc = lstcon_sesrpc_prep(nd, transop, &rpc);
1048                         break;
1049                 case LST_TRANS_SESQRY:
1050                 case LST_TRANS_SESPING:
1051                         rc = lstcon_dbgrpc_prep(nd, &rpc);
1052                         break;
1053                 case LST_TRANS_TSBCLIADD:
1054                 case LST_TRANS_TSBSRVADD:
1055                         rc = lstcon_testrpc_prep(nd, transop,
1056                                                  (lstcon_test_t *)arg, &rpc);
1057                         break;
1058                 case LST_TRANS_TSBRUN:
1059                 case LST_TRANS_TSBSTOP:
1060                 case LST_TRANS_TSBCLIQRY:
1061                 case LST_TRANS_TSBSRVQRY:
1062                         rc = lstcon_batrpc_prep(nd, transop,
1063                                                 (lstcon_tsb_hdr_t *)arg, &rpc);
1064                         break;
1065                 case LST_TRANS_STATQRY:
1066                         rc = lstcon_statrpc_prep(nd, &rpc);
1067                         break;
1068                 default:
1069                         rc = -EINVAL;
1070                         break;
1071                 }
1072
1073                 if (rc != 0) {
1074                         CERROR("Failed to create RPC for transaction %s: %d\n",
1075                                lstcon_rpc_trans_name(transop), rc);
1076                         break;
1077                 }
1078                                 
1079                 lstcon_rpc_trans_addreq(trans, rpc);
1080         }
1081
1082         if (rc == 0) {
1083                 *transpp = trans;
1084                 return 0;
1085         }
1086
1087         lstcon_rpc_trans_destroy(trans);
1088
1089         return rc;
1090 }
1091
1092 void
1093 lstcon_rpc_pinger(void *arg)
1094 {
1095         stt_timer_t        *ptimer = (stt_timer_t *)arg;
1096         lstcon_rpc_trans_t *trans;
1097         lstcon_rpc_t       *crpc;
1098         srpc_msg_t         *rep;
1099         srpc_debug_reqst_t *drq;
1100         lstcon_ndlink_t    *ndl;
1101         lstcon_node_t      *nd;
1102         time_t              intv;
1103         int                 count = 0;
1104         int                 rc;
1105
1106         /* RPC pinger is a special case of transaction,
1107          * it's called by timer at 8 seconds interval.
1108          */
1109         mutex_down(&console_session.ses_mutex);
1110
1111         if (console_session.ses_shutdown || console_session.ses_expired) {
1112                 mutex_up(&console_session.ses_mutex);
1113                 return;
1114         }
1115
1116         if (!console_session.ses_expired &&
1117             cfs_time_current_sec() - console_session.ses_laststamp >
1118             console_session.ses_timeout)
1119                 console_session.ses_expired = 1;
1120
1121         trans = console_session.ses_ping;
1122
1123         LASSERT (trans != NULL);
1124
1125         list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link) {
1126                 nd = ndl->ndl_node;
1127
1128                 if (console_session.ses_expired) {
1129                         /* idle console, end session on all nodes */
1130                         if (nd->nd_state != LST_NODE_ACTIVE)
1131                                 continue;
1132
1133                         rc = lstcon_sesrpc_prep(nd, LST_TRANS_SESEND, &crpc);
1134                         if (rc != 0) {
1135                                 CERROR("Out of memory\n");
1136                                 break;
1137                         }
1138
1139                         lstcon_rpc_trans_addreq(trans, crpc);
1140                         lstcon_rpc_post(crpc);
1141
1142                         continue;
1143                 }
1144
1145                 crpc = &nd->nd_ping;
1146
1147                 if (crpc->crp_rpc != NULL) {
1148                         LASSERT (crpc->crp_trans == trans);
1149                         LASSERT (!list_empty(&crpc->crp_link));
1150
1151                         spin_lock(&crpc->crp_rpc->crpc_lock);
1152
1153                         LASSERT (crpc->crp_posted);
1154
1155                         if (!crpc->crp_finished) {
1156                                 /* in flight */
1157                                 spin_unlock(&crpc->crp_rpc->crpc_lock);
1158                                 continue;
1159                         }
1160
1161                         spin_unlock(&crpc->crp_rpc->crpc_lock);
1162
1163                         lstcon_rpc_get_reply(crpc, &rep);
1164
1165                         list_del_init(&crpc->crp_link);
1166                 
1167                         lstcon_rpc_put(crpc);
1168                 }
1169
1170                 if (nd->nd_state != LST_NODE_ACTIVE)
1171                         continue;
1172
1173                 intv = cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1174                                                      nd->nd_stamp));
1175                 if (intv < nd->nd_timeout / 2)
1176                         continue;
1177
1178                 rc = lstcon_rpc_init(nd, SRPC_SERVICE_DEBUG, 0, 0, crpc);
1179                 if (rc != 0) {
1180                         CERROR("Out of memory\n");
1181                         break;
1182                 }
1183
1184                 drq = &crpc->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
1185
1186                 drq->dbg_sid   = console_session.ses_id;
1187                 drq->dbg_flags = 0;
1188
1189                 lstcon_rpc_trans_addreq(trans, crpc);
1190                 lstcon_rpc_post(crpc);
1191
1192                 count ++;
1193         }
1194
1195         if (console_session.ses_expired) {
1196                 mutex_up(&console_session.ses_mutex);
1197                 return;
1198         }
1199
1200         CDEBUG(D_NET, "Ping %d nodes in session\n", count);
1201
1202         ptimer->stt_expires = cfs_time_current_sec() + LST_PING_INTERVAL;
1203         stt_add_timer(ptimer);
1204
1205         mutex_up(&console_session.ses_mutex);
1206 }
1207
1208 int
1209 lstcon_rpc_pinger_start(void)
1210 {
1211         stt_timer_t    *ptimer;
1212         int             rc;
1213
1214         LASSERT (list_empty(&console_session.ses_rpc_freelist));
1215         LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0);
1216
1217         rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING,
1218                                    &console_session.ses_ping);
1219         if (rc != 0) {
1220                 CERROR("Failed to create console pinger\n");
1221                 return rc;
1222         }
1223
1224         ptimer = &console_session.ses_ping_timer;
1225         ptimer->stt_expires = cfs_time_current_sec() + LST_PING_INTERVAL;
1226
1227         stt_add_timer(ptimer);
1228
1229         return 0;
1230 }
1231
1232 void
1233 lstcon_rpc_pinger_stop(void)
1234 {
1235         LASSERT (console_session.ses_shutdown);
1236
1237         stt_del_timer(&console_session.ses_ping_timer);
1238
1239         lstcon_rpc_trans_abort(console_session.ses_ping, -ESHUTDOWN);
1240         lstcon_rpc_trans_stat(console_session.ses_ping, lstcon_trans_stat());
1241         lstcon_rpc_trans_destroy(console_session.ses_ping);
1242
1243         memset(lstcon_trans_stat(), 0, sizeof(lstcon_trans_stat_t));
1244
1245         console_session.ses_ping = NULL;
1246 }
1247
1248 void
1249 lstcon_rpc_cleanup_wait(void)
1250 {
1251         lstcon_rpc_trans_t *trans;
1252         lstcon_rpc_t       *crpc;
1253         struct list_head   *pacer;
1254         struct list_head    zlist;
1255
1256         /* Called with hold of global mutex */
1257
1258         LASSERT (console_session.ses_shutdown);
1259
1260         while (!list_empty(&console_session.ses_trans_list)) { 
1261                 list_for_each(pacer, &console_session.ses_trans_list) {
1262                         trans = list_entry(pacer, lstcon_rpc_trans_t, tas_link);
1263
1264                         CDEBUG(D_NET, "Session closed, wakeup transaction %s\n",
1265                                lstcon_rpc_trans_name(trans->tas_opc));
1266
1267                         cfs_waitq_signal(&trans->tas_waitq);
1268                 }
1269
1270                 mutex_up(&console_session.ses_mutex);
1271
1272                 CWARN("Session is shutting down, "
1273                       "waiting for termination of transactions\n");
1274                 cfs_pause(cfs_time_seconds(1));
1275
1276                 mutex_down(&console_session.ses_mutex);
1277         }
1278
1279         spin_lock(&console_session.ses_rpc_lock);
1280
1281         lst_wait_until((atomic_read(&console_session.ses_rpc_counter) == 0),
1282                        console_session.ses_rpc_lock,
1283                        "Network is not accessable or target is down, "
1284                        "waiting for %d console RPCs to being recycled\n",
1285                        atomic_read(&console_session.ses_rpc_counter));
1286
1287         list_add(&zlist, &console_session.ses_rpc_freelist);
1288         list_del_init(&console_session.ses_rpc_freelist);
1289
1290         spin_unlock(&console_session.ses_rpc_lock);
1291
1292         while (!list_empty(&zlist)) {
1293                 crpc = list_entry(zlist.next, lstcon_rpc_t, crp_link);
1294
1295                 list_del(&crpc->crp_link);
1296                 LIBCFS_FREE(crpc, sizeof(lstcon_rpc_t));
1297         }
1298 }
1299
1300 int
1301 lstcon_rpc_module_init(void)
1302 {
1303         CFS_INIT_LIST_HEAD(&console_session.ses_ping_timer.stt_list);
1304         console_session.ses_ping_timer.stt_func = lstcon_rpc_pinger;
1305         console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer;
1306
1307         console_session.ses_ping = NULL;
1308
1309         spin_lock_init(&console_session.ses_rpc_lock);
1310         atomic_set(&console_session.ses_rpc_counter, 0);
1311         CFS_INIT_LIST_HEAD(&console_session.ses_rpc_freelist);
1312
1313         return 0;
1314 }
1315
1316 void
1317 lstcon_rpc_module_fini(void)
1318 {
1319         LASSERT (list_empty(&console_session.ses_rpc_freelist));
1320         LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0);
1321 }
1322
1323 #endif