Whamcloud - gitweb
LU-17592 build: kernel 6.8 removed strlcpy()
[fs/lustre-release.git] / lnet / selftest / conrpc.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2011, 2016, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  *
13  * Console framework rpcs
14  *
15  * Author: Liang Zhen <liang@whamcloud.com>
16  */
17
18 #include <libcfs/libcfs.h>
19 #include <lnet/lib-lnet.h>
20 #include "timer.h"
21 #include "conrpc.h"
22 #include "console.h"
23
24 void lstcon_rpc_stat_reply(struct lstcon_rpc_trans *, struct srpc_msg *,
25                            struct lstcon_node *, struct lstcon_trans_stat *);
26
27 static void
28 lstcon_rpc_done(struct srpc_client_rpc *rpc)
29 {
30         struct lstcon_rpc *crpc = rpc->crpc_priv;
31
32         LASSERT(crpc != NULL && rpc == crpc->crp_rpc);
33         LASSERT(crpc->crp_posted && !crpc->crp_finished);
34
35         spin_lock(&rpc->crpc_lock);
36
37         if (crpc->crp_trans == NULL) {
38                 /* Orphan RPC is not in any transaction,
39                  * I'm just a poor body and nobody loves me */
40                 spin_unlock(&rpc->crpc_lock);
41
42                 /* release it */
43                 lstcon_rpc_put(crpc);
44                 return;
45         }
46
47         /* not an orphan RPC */
48         crpc->crp_finished = 1;
49
50         if (crpc->crp_stamp_ns == 0) {
51                 /* not aborted */
52                 LASSERT(crpc->crp_status == 0);
53
54                 crpc->crp_stamp_ns = ktime_get_ns();
55                 crpc->crp_status = rpc->crpc_status;
56         }
57
58         /* wakeup (transaction)thread if I'm the last RPC in the transaction */
59         if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining))
60                 wake_up(&crpc->crp_trans->tas_waitq);
61
62         spin_unlock(&rpc->crpc_lock);
63 }
64
65 static int
66 lstcon_rpc_init(struct lstcon_node *nd, int service, unsigned int feats,
67                 int bulk_npg, int bulk_len, int embedded,
68                 struct lstcon_rpc *crpc)
69 {
70         memset(crpc, 0, sizeof(*crpc));
71
72         crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service,
73                                        feats, bulk_npg, bulk_len,
74                                        lstcon_rpc_done, (void *)crpc);
75         if (crpc->crp_rpc == NULL)
76                 return -ENOMEM;
77
78         crpc->crp_node     = nd;
79         crpc->crp_embedded = embedded;
80         INIT_LIST_HEAD(&crpc->crp_link);
81
82         atomic_inc(&console_session.ses_rpc_counter);
83
84         return 0;
85 }
86
87 static int
88 lstcon_rpc_prep(struct lstcon_node *nd, int service, unsigned int feats,
89                 int bulk_npg, int bulk_len, struct lstcon_rpc **crpcpp)
90 {
91         struct lstcon_rpc *crpc = NULL;
92         int rc;
93
94         spin_lock(&console_session.ses_rpc_lock);
95
96         if (!list_empty(&console_session.ses_rpc_freelist)) {
97                 crpc = list_first_entry(&console_session.ses_rpc_freelist,
98                                         struct lstcon_rpc, crp_link);
99                 list_del_init(&crpc->crp_link);
100         }
101
102         spin_unlock(&console_session.ses_rpc_lock);
103
104         if (crpc == NULL) {
105                 LIBCFS_ALLOC(crpc, sizeof(*crpc));
106                 if (crpc == NULL)
107                         return -ENOMEM;
108         }
109
110         rc = lstcon_rpc_init(nd, service, feats, bulk_npg, bulk_len, 0, crpc);
111         if (rc == 0) {
112                 *crpcpp = crpc;
113                 return 0;
114         }
115
116         LIBCFS_FREE(crpc, sizeof(*crpc));
117
118         return rc;
119 }
120
121 void
122 lstcon_rpc_put(struct lstcon_rpc *crpc)
123 {
124         struct srpc_bulk *bulk = &crpc->crp_rpc->crpc_bulk;
125         int i;
126
127         LASSERT(list_empty(&crpc->crp_link));
128
129         for (i = 0; i < bulk->bk_niov; i++) {
130                 if (bulk->bk_iovs[i].bv_page == NULL)
131                         continue;
132
133                 __free_page(bulk->bk_iovs[i].bv_page);
134         }
135
136         srpc_client_rpc_decref(crpc->crp_rpc);
137
138         if (crpc->crp_embedded) {
139                 /* embedded RPC, don't recycle it */
140                 memset(crpc, 0, sizeof(*crpc));
141                 crpc->crp_embedded = 1;
142
143         } else {
144                 spin_lock(&console_session.ses_rpc_lock);
145
146                 list_add(&crpc->crp_link,
147                          &console_session.ses_rpc_freelist);
148
149                 spin_unlock(&console_session.ses_rpc_lock);
150         }
151
152         /* RPC is not alive now */
153         atomic_dec(&console_session.ses_rpc_counter);
154 }
155
156 static void
157 lstcon_rpc_post(struct lstcon_rpc *crpc)
158 {
159         struct lstcon_rpc_trans *trans = crpc->crp_trans;
160
161         LASSERT (trans != NULL);
162
163         atomic_inc(&trans->tas_remaining);
164         crpc->crp_posted = 1;
165
166         sfw_post_rpc(crpc->crp_rpc);
167 }
168
169 static char *
170 lstcon_rpc_trans_name(int transop)
171 {
172         if (transop == LST_TRANS_SESNEW)
173                 return "SESNEW";
174
175         if (transop == LST_TRANS_SESEND)
176                 return "SESEND";
177
178         if (transop == LST_TRANS_SESQRY)
179                 return "SESQRY";
180
181         if (transop == LST_TRANS_SESPING)
182                 return "SESPING";
183
184         if (transop == LST_TRANS_TSBCLIADD)
185                 return "TSBCLIADD";
186
187         if (transop == LST_TRANS_TSBSRVADD)
188                 return "TSBSRVADD";
189
190         if (transop == LST_TRANS_TSBRUN)
191                 return "TSBRUN";
192
193         if (transop == LST_TRANS_TSBSTOP)
194                 return "TSBSTOP";
195
196         if (transop == LST_TRANS_TSBCLIQRY)
197                 return "TSBCLIQRY";
198
199         if (transop == LST_TRANS_TSBSRVQRY)
200                 return "TSBSRVQRY";
201
202         if (transop == LST_TRANS_STATQRY)
203                 return "STATQRY";
204
205         return "Unknown";
206 }
207
208 int
209 lstcon_rpc_trans_prep(struct list_head *translist, int transop,
210                       struct lstcon_rpc_trans **transpp)
211 {
212         struct lstcon_rpc_trans *trans;
213
214         if (translist != NULL) {
215                 list_for_each_entry(trans, translist, tas_link) {
216                         /* Can't enqueue two private transaction on
217                          * the same object */
218                         if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE)
219                                 return -EPERM;
220                 }
221         }
222
223         /* create a trans group */
224         LIBCFS_ALLOC(trans, sizeof(*trans));
225         if (trans == NULL)
226                 return -ENOMEM;
227
228         trans->tas_opc = transop;
229
230         if (translist == NULL)
231                 INIT_LIST_HEAD(&trans->tas_olink);
232         else
233                 list_add_tail(&trans->tas_olink, translist);
234
235         list_add_tail(&trans->tas_link, &console_session.ses_trans_list);
236
237         INIT_LIST_HEAD(&trans->tas_rpcs_list);
238         atomic_set(&trans->tas_remaining, 0);
239         init_waitqueue_head(&trans->tas_waitq);
240
241         spin_lock(&console_session.ses_rpc_lock);
242         trans->tas_features = console_session.ses_features;
243         spin_unlock(&console_session.ses_rpc_lock);
244
245         *transpp = trans;
246         return 0;
247 }
248
249 void
250 lstcon_rpc_trans_addreq(struct lstcon_rpc_trans *trans, struct lstcon_rpc *crpc)
251 {
252         list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list);
253         crpc->crp_trans = trans;
254 }
255
256 void
257 lstcon_rpc_trans_abort(struct lstcon_rpc_trans *trans, int error)
258 {
259         struct srpc_client_rpc *rpc;
260         struct lstcon_rpc *crpc;
261         struct lstcon_node *nd;
262
263         list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
264                 rpc = crpc->crp_rpc;
265
266                 spin_lock(&rpc->crpc_lock);
267
268                 if (!crpc->crp_posted || /* not posted */
269                     crpc->crp_stamp_ns != 0) { /* rpc done or aborted already */
270                         if (crpc->crp_stamp_ns == 0) {
271                                 crpc->crp_stamp_ns = ktime_get_ns();
272                                 crpc->crp_status = -EINTR;
273                         }
274                         spin_unlock(&rpc->crpc_lock);
275                         continue;
276                 }
277
278                 crpc->crp_stamp_ns  = ktime_get_ns();
279                 crpc->crp_status = error;
280
281                 spin_unlock(&rpc->crpc_lock);
282
283                 sfw_abort_rpc(rpc);
284
285                 if (error != -ETIMEDOUT)
286                         continue;
287
288                 nd = crpc->crp_node;
289                 if (ktime_to_ns(nd->nd_stamp) > crpc->crp_stamp_ns)
290                         continue;
291
292                 nd->nd_stamp = ktime_set(0, crpc->crp_stamp_ns);
293                 nd->nd_state = LST_NODE_DOWN;
294         }
295 }
296
297 static int
298 lstcon_rpc_trans_check(struct lstcon_rpc_trans *trans)
299 {
300         if (console_session.ses_shutdown &&
301             !list_empty(&trans->tas_olink)) /* Not an end session RPC */
302                 return 1;
303
304         return (atomic_read(&trans->tas_remaining) == 0) ? 1: 0;
305 }
306
307 int
308 lstcon_rpc_trans_postwait(struct lstcon_rpc_trans *trans, int timeout)
309 {
310         struct lstcon_rpc *crpc;
311         int rc;
312
313         if (list_empty(&trans->tas_rpcs_list))
314                 return 0;
315
316         if (timeout < LST_TRANS_MIN_TIMEOUT)
317                 timeout = LST_TRANS_MIN_TIMEOUT;
318
319         CDEBUG(D_NET, "Transaction %s started\n",
320         lstcon_rpc_trans_name(trans->tas_opc));
321
322         /* post all requests */
323         list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
324                 LASSERT(!crpc->crp_posted);
325
326                 lstcon_rpc_post(crpc);
327         }
328
329         mutex_unlock(&console_session.ses_mutex);
330
331         rc = wait_event_interruptible_timeout(trans->tas_waitq,
332                                               lstcon_rpc_trans_check(trans),
333                                               cfs_time_seconds(timeout));
334
335         rc = (rc > 0)? 0: ((rc < 0)? -EINTR: -ETIMEDOUT);
336
337         mutex_lock(&console_session.ses_mutex);
338
339         if (console_session.ses_shutdown)
340                 rc = -ESHUTDOWN;
341
342         if (rc != 0 || atomic_read(&trans->tas_remaining) != 0) {
343                 /* treat short timeout as canceled */
344                 if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2)
345                         rc = -EINTR;
346
347                 lstcon_rpc_trans_abort(trans, rc);
348         }
349
350         CDEBUG(D_NET, "Transaction %s stopped: %d\n",
351                lstcon_rpc_trans_name(trans->tas_opc), rc);
352
353         lstcon_rpc_trans_stat(trans, lstcon_trans_stat());
354
355         return rc;
356 }
357
358 static int
359 lstcon_rpc_get_reply(struct lstcon_rpc *crpc, struct srpc_msg **msgpp)
360 {
361         struct lstcon_node *nd = crpc->crp_node;
362         struct srpc_client_rpc *rpc = crpc->crp_rpc;
363         struct srpc_generic_reply *rep;
364
365         LASSERT(nd != NULL && rpc != NULL);
366         LASSERT(crpc->crp_stamp_ns != 0);
367
368         if (crpc->crp_status != 0) {
369                 *msgpp = NULL;
370                 return crpc->crp_status;
371         }
372
373         *msgpp = &rpc->crpc_replymsg;
374         if (!crpc->crp_unpacked) {
375                 sfw_unpack_message(*msgpp);
376                 crpc->crp_unpacked = 1;
377         }
378
379         if (ktime_to_ns(nd->nd_stamp) > crpc->crp_stamp_ns)
380                 return 0;
381
382         nd->nd_stamp = ktime_set(0, crpc->crp_stamp_ns);
383         rep = &(*msgpp)->msg_body.reply;
384
385         if (rep->sid.ses_nid == LNET_NID_ANY)
386                 nd->nd_state = LST_NODE_UNKNOWN;
387         else if (lstcon_session_match(rep->sid))
388                 nd->nd_state = LST_NODE_ACTIVE;
389         else
390                 nd->nd_state = LST_NODE_BUSY;
391
392         return 0;
393 }
394
395 void
396 lstcon_rpc_trans_stat(struct lstcon_rpc_trans *trans,
397                       struct lstcon_trans_stat *stat)
398 {
399         struct lstcon_rpc *crpc;
400         struct srpc_msg *rep;
401         int error;
402
403         LASSERT(stat != NULL);
404
405         memset(stat, 0, sizeof(*stat));
406
407         list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
408                 lstcon_rpc_stat_total(stat, 1);
409
410                 LASSERT(crpc->crp_stamp_ns != 0);
411
412                 error = lstcon_rpc_get_reply(crpc, &rep);
413                 if (error != 0) {
414                         lstcon_rpc_stat_failure(stat, 1);
415                         if (stat->trs_rpc_errno == 0)
416                                 stat->trs_rpc_errno = -error;
417
418                         continue;
419                 }
420
421                 lstcon_rpc_stat_success(stat, 1);
422
423                 lstcon_rpc_stat_reply(trans, rep, crpc->crp_node, stat);
424         }
425
426         if (trans->tas_opc == LST_TRANS_SESNEW && stat->trs_fwk_errno == 0) {
427                 stat->trs_fwk_errno =
428                       lstcon_session_feats_check(trans->tas_features);
429         }
430
431         CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, "
432                       "RPC error(%d), Framework error(%d)\n",
433                lstcon_rpc_trans_name(trans->tas_opc),
434                lstcon_rpc_stat_success(stat, 0),
435                lstcon_rpc_stat_failure(stat, 0),
436                lstcon_rpc_stat_total(stat, 0),
437                stat->trs_rpc_errno, stat->trs_fwk_errno);
438 }
439
440 int
441 lstcon_rpc_trans_interpreter(struct lstcon_rpc_trans *trans,
442                              struct list_head __user *head_up,
443                              lstcon_rpc_readent_func_t readent)
444 {
445         struct list_head tmp;
446         struct list_head __user *next;
447         struct lstcon_rpc_ent *ent;
448         struct srpc_generic_reply *rep;
449         struct lstcon_rpc *crpc;
450         struct srpc_msg *msg;
451         struct lstcon_node *nd;
452         struct timespec64 ts;
453         int error;
454         s64 dur;
455
456         LASSERT(head_up != NULL);
457
458         next = head_up;
459
460         list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
461                 if (copy_from_user(&tmp, next,
462                                    sizeof(struct list_head)))
463                         return -EFAULT;
464
465                 if (tmp.next == head_up)
466                         return 0;
467
468                 next = tmp.next;
469
470                 ent = list_entry(next, struct lstcon_rpc_ent, rpe_link);
471
472                 LASSERT(crpc->crp_stamp_ns != 0);
473
474                 error = lstcon_rpc_get_reply(crpc, &msg);
475
476                 nd = crpc->crp_node;
477
478                 dur = crpc->crp_stamp_ns -
479                       console_session.ses_id.ses_stamp * NSEC_PER_MSEC;
480                 ts = ns_to_timespec64(dur);
481
482                 if (copy_to_user(&ent->rpe_peer,
483                                  &nd->nd_id, sizeof(struct lnet_process_id)) ||
484                     copy_to_user(&ent->rpe_stamp, &ts, sizeof(ts)) ||
485                     copy_to_user(&ent->rpe_state,
486                                  &nd->nd_state, sizeof(nd->nd_state)) ||
487                     copy_to_user(&ent->rpe_rpc_errno, &error,
488                                      sizeof(error)))
489                         return -EFAULT;
490
491                 if (error != 0)
492                         continue;
493
494                 /* RPC is done */
495                 rep = (struct srpc_generic_reply *)&msg->msg_body.reply;
496
497                 if (copy_to_user(&ent->rpe_sid,
498                                  &rep->sid, sizeof(rep->sid)) ||
499                     copy_to_user(&ent->rpe_fwk_errno,
500                                  &rep->status, sizeof(rep->status)))
501                         return -EFAULT;
502
503                 if (readent == NULL)
504                         continue;
505
506                 error = readent(trans->tas_opc, msg, ent);
507                 if (error != 0)
508                         return error;
509         }
510
511         return 0;
512 }
513
514 void
515 lstcon_rpc_trans_destroy(struct lstcon_rpc_trans *trans)
516 {
517         struct srpc_client_rpc *rpc;
518         struct lstcon_rpc *crpc;
519         struct lstcon_rpc *tmp;
520         int count = 0;
521
522         list_for_each_entry_safe(crpc, tmp, &trans->tas_rpcs_list, crp_link) {
523                 rpc = crpc->crp_rpc;
524
525                 spin_lock(&rpc->crpc_lock);
526
527                 /* free it if not posted or finished already */
528                 if (!crpc->crp_posted || crpc->crp_finished) {
529                         spin_unlock(&rpc->crpc_lock);
530
531                         list_del_init(&crpc->crp_link);
532                         lstcon_rpc_put(crpc);
533
534                         continue;
535                 }
536
537                 /* rpcs can be still not callbacked (even LNetMDUnlink is
538                  * called) because huge timeout for inaccessible network,
539                  * don't make user wait for them, just abandon them, they
540                  * will be recycled in callback */
541
542                 LASSERT(crpc->crp_status != 0);
543
544                 crpc->crp_node  = NULL;
545                 crpc->crp_trans = NULL;
546                 list_del_init(&crpc->crp_link);
547                 count++;
548
549                 spin_unlock(&rpc->crpc_lock);
550
551                 atomic_dec(&trans->tas_remaining);
552         }
553
554         LASSERT(atomic_read(&trans->tas_remaining) == 0);
555
556         list_del(&trans->tas_link);
557         if (!list_empty(&trans->tas_olink))
558                 list_del(&trans->tas_olink);
559
560         CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n",
561                lstcon_rpc_trans_name(trans->tas_opc), count);
562
563         LIBCFS_FREE(trans, sizeof(*trans));
564 }
565
566 int
567 lstcon_sesrpc_prep(struct lstcon_node *nd, int transop,
568                    unsigned int feats, struct lstcon_rpc **crpc)
569 {
570         struct srpc_mksn_reqst *msrq;
571         struct srpc_rmsn_reqst *rsrq;
572         int rc;
573
574         switch (transop) {
575         case LST_TRANS_SESNEW:
576                 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION,
577                                      feats, 0, 0, crpc);
578                 if (rc != 0)
579                         return rc;
580
581                 msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst;
582                 msrq->mksn_sid.ses_stamp = console_session.ses_id.ses_stamp;
583                 msrq->mksn_sid.ses_nid =
584                         lnet_nid_to_nid4(&console_session.ses_id.ses_nid);
585                 msrq->mksn_force = console_session.ses_force;
586                 strscpy(msrq->mksn_name, console_session.ses_name,
587                         sizeof(msrq->mksn_name));
588                 break;
589
590         case LST_TRANS_SESEND:
591                 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION,
592                                      feats, 0, 0, crpc);
593                 if (rc != 0)
594                         return rc;
595
596                 rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst;
597                 rsrq->rmsn_sid.ses_stamp = console_session.ses_id.ses_stamp;
598                 rsrq->rmsn_sid.ses_nid =
599                         lnet_nid_to_nid4(&console_session.ses_id.ses_nid);
600                 break;
601
602         default:
603                 LBUG();
604         }
605
606         return 0;
607 }
608
609 int
610 lstcon_dbgrpc_prep(struct lstcon_node *nd, unsigned int feats,
611                    struct lstcon_rpc **crpc)
612 {
613         struct srpc_debug_reqst *drq;
614         int rc;
615
616         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, feats, 0, 0, crpc);
617         if (rc != 0)
618                 return rc;
619
620         drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
621
622         drq->dbg_sid.ses_stamp = console_session.ses_id.ses_stamp;
623         drq->dbg_sid.ses_nid =
624                 lnet_nid_to_nid4(&console_session.ses_id.ses_nid);
625         drq->dbg_flags = 0;
626
627         return rc;
628 }
629
630 int
631 lstcon_batrpc_prep(struct lstcon_node *nd, int transop, unsigned int feats,
632                    struct lstcon_tsb_hdr *tsb, struct lstcon_rpc **crpc)
633 {
634         struct lstcon_batch *batch;
635         struct srpc_batch_reqst *brq;
636         int rc;
637
638         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, feats, 0, 0, crpc);
639         if (rc != 0)
640                 return rc;
641
642         brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst;
643
644         brq->bar_sid.ses_stamp = console_session.ses_id.ses_stamp;
645         brq->bar_sid.ses_nid =
646                 lnet_nid_to_nid4(&console_session.ses_id.ses_nid);
647         brq->bar_bid     = tsb->tsb_id;
648         brq->bar_testidx = tsb->tsb_index;
649         brq->bar_opc     = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN :
650                            (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP:
651                             SRPC_BATCH_OPC_QUERY);
652
653         if (transop != LST_TRANS_TSBRUN &&
654             transop != LST_TRANS_TSBSTOP)
655                 return 0;
656
657         LASSERT (tsb->tsb_index == 0);
658
659         batch = (struct lstcon_batch *)tsb;
660         brq->bar_arg = batch->bat_arg;
661
662         return 0;
663 }
664
665 int
666 lstcon_statrpc_prep(struct lstcon_node *nd, unsigned int feats,
667                     struct lstcon_rpc **crpc)
668 {
669         struct srpc_stat_reqst *srq;
670         int rc;
671
672         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, feats, 0, 0, crpc);
673         if (rc != 0)
674                 return rc;
675
676         srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst;
677
678
679         srq->str_sid.ses_stamp = console_session.ses_id.ses_stamp;
680         srq->str_sid.ses_nid =
681                 lnet_nid_to_nid4(&console_session.ses_id.ses_nid);
682         srq->str_type = 0; /* XXX remove it */
683
684         return 0;
685 }
686
687 static struct lnet_process_id_packed *
688 lstcon_next_id(int idx, int nkiov, struct bio_vec *kiov)
689 {
690         struct lnet_process_id_packed *pid;
691         int                       i;
692
693         i = idx / SFW_ID_PER_PAGE;
694
695         LASSERT (i < nkiov);
696
697         pid = (struct lnet_process_id_packed *)page_address(kiov[i].bv_page);
698
699         return &pid[idx % SFW_ID_PER_PAGE];
700 }
701
702 static int
703 lstcon_dstnodes_prep(struct lstcon_group *grp, int idx,
704                      int dist, int span, int nkiov, struct bio_vec *kiov)
705 {
706         struct lnet_process_id_packed *pid;
707         struct lstcon_ndlink *ndl;
708         struct lstcon_node *nd;
709         int start;
710         int end;
711         int i = 0;
712
713         LASSERT (dist >= 1);
714         LASSERT (span >= 1);
715         LASSERT (grp->grp_nnode >= 1);
716
717         if (span > grp->grp_nnode)
718                 return -EINVAL;
719
720         start = ((idx / dist) * span) % grp->grp_nnode;
721         end   = ((idx / dist) * span + span - 1) % grp->grp_nnode;
722
723         list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
724                 nd = ndl->ndl_node;
725                 if (i < start) {
726                         i++;
727                         continue;
728                 }
729
730                 if (i > (end >= start ? end : grp->grp_nnode))
731                         break;
732
733                 pid = lstcon_next_id((i - start), nkiov, kiov);
734                 pid->nid = nd->nd_id.nid;
735                 pid->pid = nd->nd_id.pid;
736                 i++;
737         }
738
739         if (start <= end)       /* done */
740                 return 0;
741
742         list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
743                 if (i > grp->grp_nnode + end)
744                         break;
745
746                 nd = ndl->ndl_node;
747                 pid = lstcon_next_id((i - start), nkiov, kiov);
748                 pid->nid = nd->nd_id.nid;
749                 pid->pid = nd->nd_id.pid;
750                 i++;
751         }
752
753         return 0;
754 }
755
756 static int
757 lstcon_pingrpc_prep(struct lst_test_ping_param *param,
758                     struct srpc_test_reqst *req)
759 {
760         struct test_ping_req *prq = &req->tsr_u.ping;
761
762         if (param) {
763                 prq->png_size   = param->png_size;
764                 prq->png_flags  = param->png_flags;
765         } else {
766                 prq->png_size   = 0;
767                 prq->png_flags  = 0;
768         }
769         /* TODO dest */
770         return 0;
771 }
772
773 static int
774 lstcon_bulkrpc_v0_prep(struct lst_test_bulk_param *param,
775                        struct srpc_test_reqst *req)
776 {
777         struct test_bulk_req *brq = &req->tsr_u.bulk_v0;
778
779         brq->blk_opc    = param->blk_opc;
780         brq->blk_npg    = (param->blk_size + PAGE_SIZE - 1) /
781                            PAGE_SIZE;
782         brq->blk_flags  = param->blk_flags;
783
784         return 0;
785 }
786
787 static int
788 lstcon_bulkrpc_v1_prep(struct lst_test_bulk_param *param, bool is_client,
789                        struct srpc_test_reqst *req)
790 {
791         struct test_bulk_req_v1 *brq = &req->tsr_u.bulk_v1;
792
793         brq->blk_opc    = param->blk_opc;
794         brq->blk_flags  = param->blk_flags;
795         brq->blk_len    = param->blk_size;
796         brq->blk_offset = is_client ? param->blk_cli_off : param->blk_srv_off;
797
798         return 0;
799 }
800
801 int
802 lstcon_testrpc_prep(struct lstcon_node *nd, int transop, unsigned int feats,
803                     struct lstcon_test *test, struct lstcon_rpc **crpc)
804 {
805         struct lstcon_group *sgrp = test->tes_src_grp;
806         struct lstcon_group *dgrp = test->tes_dst_grp;
807         struct srpc_test_reqst *trq;
808         struct srpc_bulk *bulk;
809         int i;
810         int npg = 0;
811         int nob = 0;
812         int rc = 0;
813
814         if (transop == LST_TRANS_TSBCLIADD) {
815                 npg = sfw_id_pages(test->tes_span);
816                 nob = (feats & LST_FEAT_BULK_LEN) == 0 ?
817                       npg * PAGE_SIZE :
818                       sizeof(struct lnet_process_id_packed) * test->tes_span;
819         }
820
821         rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, feats, npg, nob, crpc);
822         if (rc != 0)
823                 return rc;
824
825         trq  = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst;
826
827         if (transop == LST_TRANS_TSBSRVADD) {
828                 int ndist = (sgrp->grp_nnode + test->tes_dist - 1) / test->tes_dist;
829                 int nspan = (dgrp->grp_nnode + test->tes_span - 1) / test->tes_span;
830                 int nmax = (ndist + nspan - 1) / nspan;
831
832                 trq->tsr_ndest = 0;
833                 trq->tsr_loop  = nmax * test->tes_dist * test->tes_concur;
834
835         } else {
836                 bulk = &(*crpc)->crp_rpc->crpc_bulk;
837
838                 for (i = 0; i < npg; i++) {
839                         int     len;
840
841                         LASSERT(nob > 0);
842
843                         len = (feats & LST_FEAT_BULK_LEN) == 0 ?
844                               PAGE_SIZE : min_t(int, nob, PAGE_SIZE);
845                         nob -= len;
846
847                         bulk->bk_iovs[i].bv_offset = 0;
848                         bulk->bk_iovs[i].bv_len    = len;
849                         bulk->bk_iovs[i].bv_page   =
850                                 alloc_page(GFP_KERNEL);
851
852                         if (bulk->bk_iovs[i].bv_page == NULL) {
853                                 lstcon_rpc_put(*crpc);
854                                 return -ENOMEM;
855                         }
856                 }
857
858                 bulk->bk_sink = 0;
859
860                 LASSERT (transop == LST_TRANS_TSBCLIADD);
861
862                 rc = lstcon_dstnodes_prep(test->tes_dst_grp,
863                                           test->tes_cliidx++,
864                                           test->tes_dist,
865                                           test->tes_span,
866                                           npg, &bulk->bk_iovs[0]);
867                 if (rc != 0) {
868                         lstcon_rpc_put(*crpc);
869                         return rc;
870                 }
871
872                 trq->tsr_ndest = test->tes_span;
873                 trq->tsr_loop  = test->tes_loop;
874         }
875
876         trq->tsr_sid.ses_stamp = console_session.ses_id.ses_stamp;
877         trq->tsr_sid.ses_nid =
878                 lnet_nid_to_nid4(&console_session.ses_id.ses_nid);
879         trq->tsr_bid        = test->tes_hdr.tsb_id;
880         trq->tsr_concur     = test->tes_concur;
881         trq->tsr_is_client  = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0;
882         trq->tsr_stop_onerr = !!test->tes_stop_onerr;
883
884         switch (test->tes_type) {
885         case LST_TEST_PING: {
886                 struct lst_test_ping_param *data = NULL;
887
888                 trq->tsr_service = SRPC_SERVICE_PING;
889                 if (test->tes_paramlen)
890                         data = ((struct lst_test_ping_param *)
891                                 &test->tes_param[0]);
892
893                 rc = lstcon_pingrpc_prep(data, trq);
894                 break;
895         }
896         case LST_TEST_BULK:
897                 trq->tsr_service = SRPC_SERVICE_BRW;
898                 if ((feats & LST_FEAT_BULK_LEN) == 0) {
899                         rc = lstcon_bulkrpc_v0_prep((struct lst_test_bulk_param *)
900                                                     &test->tes_param[0], trq);
901                 } else {
902                         rc = lstcon_bulkrpc_v1_prep((struct lst_test_bulk_param *)
903                                                     &test->tes_param[0],
904                                                     trq->tsr_is_client, trq);
905                 }
906
907                 break;
908         default:
909                 LBUG();
910                 break;
911         }
912
913         return rc;
914 }
915
916 static int
917 lstcon_sesnew_stat_reply(struct lstcon_rpc_trans *trans,
918                          struct lstcon_node *nd, struct srpc_msg *reply)
919 {
920         struct srpc_mksn_reply *mksn_rep = &reply->msg_body.mksn_reply;
921         int status = mksn_rep->mksn_status;
922
923         if (status == 0 &&
924             (reply->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
925                 mksn_rep->mksn_status = EPROTO;
926                 status = EPROTO;
927         }
928
929         if (status == EPROTO) {
930                 CNETERR("session protocol error from %s: %u\n",
931                         libcfs_nid2str(nd->nd_id.nid),
932                         reply->msg_ses_feats);
933         }
934
935         if (status != 0)
936                 return status;
937
938         if (!trans->tas_feats_updated) {
939                 spin_lock(&console_session.ses_rpc_lock);
940                 if (!trans->tas_feats_updated) { /* recheck with lock */
941                         trans->tas_feats_updated = 1;
942                         trans->tas_features = reply->msg_ses_feats;
943                 }
944                 spin_unlock(&console_session.ses_rpc_lock);
945         }
946
947         if (reply->msg_ses_feats != trans->tas_features) {
948                 CNETERR("Framework features %x from %s is different with "
949                         "features on this transaction: %x\n",
950                          reply->msg_ses_feats, libcfs_nid2str(nd->nd_id.nid),
951                          trans->tas_features);
952                 status = mksn_rep->mksn_status = EPROTO;
953         }
954
955         if (status == 0) {
956                 /* session timeout on remote node */
957                 nd->nd_timeout = mksn_rep->mksn_timeout;
958         }
959
960         return status;
961 }
962
963 void
964 lstcon_rpc_stat_reply(struct lstcon_rpc_trans *trans, struct srpc_msg *msg,
965                       struct lstcon_node *nd, struct lstcon_trans_stat *stat)
966 {
967         struct srpc_rmsn_reply *rmsn_rep;
968         struct srpc_debug_reply *dbg_rep;
969         struct srpc_batch_reply *bat_rep;
970         struct srpc_test_reply *test_rep;
971         struct srpc_stat_reply *stat_rep;
972         int rc = 0;
973
974         switch (trans->tas_opc) {
975         case LST_TRANS_SESNEW:
976                 rc = lstcon_sesnew_stat_reply(trans, nd, msg);
977                 if (rc == 0) {
978                         lstcon_sesop_stat_success(stat, 1);
979                         return;
980                 }
981
982                 lstcon_sesop_stat_failure(stat, 1);
983                 break;
984
985         case LST_TRANS_SESEND:
986                 rmsn_rep = &msg->msg_body.rmsn_reply;
987                 /* ESRCH is not an error for end session */
988                 if (rmsn_rep->rmsn_status == 0 ||
989                     rmsn_rep->rmsn_status == ESRCH) {
990                         lstcon_sesop_stat_success(stat, 1);
991                         return;
992                 }
993
994                 lstcon_sesop_stat_failure(stat, 1);
995                 rc = rmsn_rep->rmsn_status;
996                 break;
997
998         case LST_TRANS_SESQRY:
999         case LST_TRANS_SESPING:
1000                 dbg_rep = &msg->msg_body.dbg_reply;
1001
1002                 if (dbg_rep->dbg_status == ESRCH) {
1003                         lstcon_sesqry_stat_unknown(stat, 1);
1004                         return;
1005                 }
1006
1007                 if (lstcon_session_match(dbg_rep->dbg_sid))
1008                         lstcon_sesqry_stat_active(stat, 1);
1009                 else
1010                         lstcon_sesqry_stat_busy(stat, 1);
1011                 return;
1012
1013         case LST_TRANS_TSBRUN:
1014         case LST_TRANS_TSBSTOP:
1015                 bat_rep = &msg->msg_body.bat_reply;
1016
1017                 if (bat_rep->bar_status == 0) {
1018                         lstcon_tsbop_stat_success(stat, 1);
1019                         return;
1020                 }
1021
1022                 if (bat_rep->bar_status == EPERM &&
1023                     trans->tas_opc == LST_TRANS_TSBSTOP) {
1024                         lstcon_tsbop_stat_success(stat, 1);
1025                         return;
1026                 }
1027
1028                 lstcon_tsbop_stat_failure(stat, 1);
1029                 rc = bat_rep->bar_status;
1030                 break;
1031
1032         case LST_TRANS_TSBCLIQRY:
1033         case LST_TRANS_TSBSRVQRY:
1034                 bat_rep = &msg->msg_body.bat_reply;
1035
1036                 if (bat_rep->bar_active != 0)
1037                         lstcon_tsbqry_stat_run(stat, 1);
1038                 else
1039                         lstcon_tsbqry_stat_idle(stat, 1);
1040
1041                 if (bat_rep->bar_status == 0)
1042                         return;
1043
1044                 lstcon_tsbqry_stat_failure(stat, 1);
1045                 rc = bat_rep->bar_status;
1046                 break;
1047
1048         case LST_TRANS_TSBCLIADD:
1049         case LST_TRANS_TSBSRVADD:
1050                 test_rep = &msg->msg_body.tes_reply;
1051
1052                 if (test_rep->tsr_status == 0) {
1053                         lstcon_tsbop_stat_success(stat, 1);
1054                         return;
1055                 }
1056
1057                 lstcon_tsbop_stat_failure(stat, 1);
1058                 rc = test_rep->tsr_status;
1059                 break;
1060
1061         case LST_TRANS_STATQRY:
1062                 stat_rep = &msg->msg_body.stat_reply;
1063
1064                 if (stat_rep->str_status == 0) {
1065                         lstcon_statqry_stat_success(stat, 1);
1066                         return;
1067                 }
1068
1069                 lstcon_statqry_stat_failure(stat, 1);
1070                 rc = stat_rep->str_status;
1071                 break;
1072
1073         default:
1074                 LBUG();
1075         }
1076
1077         if (stat->trs_fwk_errno == 0)
1078                 stat->trs_fwk_errno = rc;
1079 }
1080
1081 int
1082 lstcon_rpc_trans_ndlist(struct list_head *ndlist,
1083                         struct list_head *translist, int transop,
1084                         void *arg, lstcon_rpc_cond_func_t condition,
1085                         struct lstcon_rpc_trans **transpp)
1086 {
1087         struct lstcon_rpc_trans *trans;
1088         struct lstcon_ndlink *ndl;
1089         struct lstcon_node *nd;
1090         struct lstcon_rpc *rpc;
1091         unsigned int feats;
1092         int rc;
1093
1094         /* Creating session RPG for list of nodes */
1095
1096         rc = lstcon_rpc_trans_prep(translist, transop, &trans);
1097         if (rc != 0) {
1098                 CERROR("Can't create transaction %d: %d\n", transop, rc);
1099                 return rc;
1100         }
1101
1102         feats = trans->tas_features;
1103         list_for_each_entry(ndl, ndlist, ndl_link) {
1104                 rc = condition == NULL ? 1 :
1105                      condition(transop, ndl->ndl_node, arg);
1106
1107                 if (rc == 0)
1108                         continue;
1109
1110                 if (rc < 0) {
1111                         CDEBUG(D_NET, "Condition error while creating RPC "
1112                                       " for transaction %d: %d\n", transop, rc);
1113                         break;
1114                 }
1115
1116                 nd = ndl->ndl_node;
1117
1118                 switch (transop) {
1119                 case LST_TRANS_SESNEW:
1120                 case LST_TRANS_SESEND:
1121                         rc = lstcon_sesrpc_prep(nd, transop, feats, &rpc);
1122                         break;
1123                 case LST_TRANS_SESQRY:
1124                 case LST_TRANS_SESPING:
1125                         rc = lstcon_dbgrpc_prep(nd, feats, &rpc);
1126                         break;
1127                 case LST_TRANS_TSBCLIADD:
1128                 case LST_TRANS_TSBSRVADD:
1129                         rc = lstcon_testrpc_prep(nd, transop, feats,
1130                                                  (struct lstcon_test *)arg,
1131                                                  &rpc);
1132                         break;
1133                 case LST_TRANS_TSBRUN:
1134                 case LST_TRANS_TSBSTOP:
1135                 case LST_TRANS_TSBCLIQRY:
1136                 case LST_TRANS_TSBSRVQRY:
1137                         rc = lstcon_batrpc_prep(nd, transop, feats,
1138                                                 (struct lstcon_tsb_hdr *)arg,
1139                                                 &rpc);
1140                         break;
1141                 case LST_TRANS_STATQRY:
1142                         rc = lstcon_statrpc_prep(nd, feats, &rpc);
1143                         break;
1144                 default:
1145                         rc = -EINVAL;
1146                         break;
1147                 }
1148
1149                 if (rc != 0) {
1150                         CERROR("Failed to create RPC for transaction %s: %d\n",
1151                                lstcon_rpc_trans_name(transop), rc);
1152                         break;
1153                 }
1154
1155                 lstcon_rpc_trans_addreq(trans, rpc);
1156         }
1157
1158         if (rc == 0) {
1159                 *transpp = trans;
1160                 return 0;
1161         }
1162
1163         lstcon_rpc_trans_destroy(trans);
1164
1165         return rc;
1166 }
1167
1168 static void
1169 lstcon_rpc_pinger(void *arg)
1170 {
1171         struct stt_timer *ptimer = arg;
1172         struct lstcon_rpc_trans *trans;
1173         struct lstcon_rpc *crpc;
1174         struct srpc_msg *rep;
1175         struct srpc_debug_reqst *drq;
1176         struct lstcon_ndlink *ndl;
1177         struct lstcon_node *nd;
1178         int intv;
1179         int count = 0;
1180         int rc;
1181
1182         /* RPC pinger is a special case of transaction,
1183          * it's called by timer at 8 seconds interval.
1184          */
1185         mutex_lock(&console_session.ses_mutex);
1186
1187         if (console_session.ses_shutdown || console_session.ses_expired) {
1188                 mutex_unlock(&console_session.ses_mutex);
1189                 return;
1190         }
1191
1192         if (!console_session.ses_expired &&
1193             ktime_get_real_seconds() - console_session.ses_laststamp >
1194             (time64_t)console_session.ses_timeout)
1195                 console_session.ses_expired = 1;
1196
1197         trans = console_session.ses_ping;
1198
1199         LASSERT(trans != NULL);
1200
1201         list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link) {
1202                 nd = ndl->ndl_node;
1203
1204                 if (console_session.ses_expired) {
1205                         /* idle console, end session on all nodes */
1206                         if (nd->nd_state != LST_NODE_ACTIVE)
1207                                 continue;
1208
1209                         rc = lstcon_sesrpc_prep(nd, LST_TRANS_SESEND,
1210                                                 trans->tas_features, &crpc);
1211                         if (rc != 0) {
1212                                 CERROR("Out of memory\n");
1213                                 break;
1214                         }
1215
1216                         lstcon_rpc_trans_addreq(trans, crpc);
1217                         lstcon_rpc_post(crpc);
1218
1219                         continue;
1220                 }
1221
1222                 crpc = &nd->nd_ping;
1223
1224                 if (crpc->crp_rpc != NULL) {
1225                         LASSERT(crpc->crp_trans == trans);
1226                         LASSERT(!list_empty(&crpc->crp_link));
1227
1228                         spin_lock(&crpc->crp_rpc->crpc_lock);
1229
1230                         LASSERT(crpc->crp_posted);
1231
1232                         if (!crpc->crp_finished) {
1233                                 /* in flight */
1234                                 spin_unlock(&crpc->crp_rpc->crpc_lock);
1235                                 continue;
1236                         }
1237
1238                         spin_unlock(&crpc->crp_rpc->crpc_lock);
1239
1240                         lstcon_rpc_get_reply(crpc, &rep);
1241
1242                         list_del_init(&crpc->crp_link);
1243
1244                         lstcon_rpc_put(crpc);
1245                 }
1246
1247                 if (nd->nd_state != LST_NODE_ACTIVE)
1248                         continue;
1249
1250                 intv = div_u64(ktime_ms_delta(ktime_get(), nd->nd_stamp),
1251                                MSEC_PER_SEC);
1252                 if (intv < nd->nd_timeout / 2)
1253                         continue;
1254
1255                 rc = lstcon_rpc_init(nd, SRPC_SERVICE_DEBUG,
1256                                      trans->tas_features, 0, 0, 1, crpc);
1257                 if (rc != 0) {
1258                         CERROR("Out of memory\n");
1259                         break;
1260                 }
1261
1262                 drq = &crpc->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
1263
1264                 drq->dbg_sid.ses_stamp = console_session.ses_id.ses_stamp;
1265                 drq->dbg_sid.ses_nid =
1266                         lnet_nid_to_nid4(&console_session.ses_id.ses_nid);
1267                 drq->dbg_flags = 0;
1268
1269                 lstcon_rpc_trans_addreq(trans, crpc);
1270                 lstcon_rpc_post(crpc);
1271
1272                 count++;
1273         }
1274
1275         if (console_session.ses_expired) {
1276                 mutex_unlock(&console_session.ses_mutex);
1277                 return;
1278         }
1279
1280         CDEBUG(D_NET, "Ping %d nodes in session\n", count);
1281
1282         ptimer->stt_expires = ktime_get_real_seconds() + LST_PING_INTERVAL;
1283         stt_add_timer(ptimer);
1284
1285         mutex_unlock(&console_session.ses_mutex);
1286 }
1287
1288 int
1289 lstcon_rpc_pinger_start(void)
1290 {
1291         struct stt_timer *ptimer;
1292         int rc;
1293
1294         LASSERT(list_empty(&console_session.ses_rpc_freelist));
1295         LASSERT(atomic_read(&console_session.ses_rpc_counter) == 0);
1296
1297         rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING,
1298                                    &console_session.ses_ping);
1299         if (rc != 0) {
1300                 CERROR("Failed to create console pinger\n");
1301                 return rc;
1302         }
1303
1304         ptimer = &console_session.ses_ping_timer;
1305         ptimer->stt_expires = ktime_get_real_seconds() + LST_PING_INTERVAL;
1306
1307         stt_add_timer(ptimer);
1308
1309         return 0;
1310 }
1311
1312 void
1313 lstcon_rpc_pinger_stop(void)
1314 {
1315         LASSERT (console_session.ses_shutdown);
1316
1317         stt_del_timer(&console_session.ses_ping_timer);
1318
1319         lstcon_rpc_trans_abort(console_session.ses_ping, -ESHUTDOWN);
1320         lstcon_rpc_trans_stat(console_session.ses_ping, lstcon_trans_stat());
1321         lstcon_rpc_trans_destroy(console_session.ses_ping);
1322
1323         memset(lstcon_trans_stat(), 0, sizeof(struct lstcon_trans_stat));
1324
1325         console_session.ses_ping = NULL;
1326 }
1327
1328 void
1329 lstcon_rpc_cleanup_wait(void)
1330 {
1331         struct lstcon_rpc_trans *trans;
1332         struct lstcon_rpc *crpc;
1333         struct list_head *pacer;
1334         LIST_HEAD(zlist);
1335
1336         /* Called with hold of global mutex */
1337
1338         LASSERT(console_session.ses_shutdown);
1339
1340         while (!list_empty(&console_session.ses_trans_list)) {
1341                 list_for_each(pacer, &console_session.ses_trans_list) {
1342                         trans = list_entry(pacer, struct lstcon_rpc_trans,
1343                                            tas_link);
1344
1345                         CDEBUG(D_NET, "Session closed, wakeup transaction %s\n",
1346                                lstcon_rpc_trans_name(trans->tas_opc));
1347
1348                         wake_up(&trans->tas_waitq);
1349                 }
1350
1351                 mutex_unlock(&console_session.ses_mutex);
1352
1353                 CWARN("Session is shutting down, "
1354                       "waiting for termination of transactions\n");
1355                 schedule_timeout_uninterruptible(cfs_time_seconds(1));
1356
1357                 mutex_lock(&console_session.ses_mutex);
1358         }
1359
1360         spin_lock(&console_session.ses_rpc_lock);
1361
1362         lst_wait_until((atomic_read(&console_session.ses_rpc_counter) == 0),
1363                        console_session.ses_rpc_lock,
1364                        "Network is not accessable or target is down, "
1365                        "waiting for %d console RPCs to being recycled\n",
1366                        atomic_read(&console_session.ses_rpc_counter));
1367
1368         list_splice_init(&console_session.ses_rpc_freelist, &zlist);
1369
1370         spin_unlock(&console_session.ses_rpc_lock);
1371
1372         while (!list_empty(&zlist)) {
1373                 crpc = list_first_entry(&zlist, struct lstcon_rpc, crp_link);
1374
1375                 list_del(&crpc->crp_link);
1376                 LIBCFS_FREE(crpc, sizeof(*crpc));
1377         }
1378 }
1379
1380 int
1381 lstcon_rpc_module_init(void)
1382 {
1383         INIT_LIST_HEAD(&console_session.ses_ping_timer.stt_list);
1384         console_session.ses_ping_timer.stt_func = lstcon_rpc_pinger;
1385         console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer;
1386
1387         console_session.ses_ping = NULL;
1388
1389         spin_lock_init(&console_session.ses_rpc_lock);
1390         atomic_set(&console_session.ses_rpc_counter, 0);
1391         INIT_LIST_HEAD(&console_session.ses_rpc_freelist);
1392
1393         return 0;
1394 }
1395
1396 void
1397 lstcon_rpc_module_fini(void)
1398 {
1399         LASSERT(list_empty(&console_session.ses_rpc_freelist));
1400         LASSERT(atomic_read(&console_session.ses_rpc_counter) == 0);
1401 }
1402