Whamcloud - gitweb
- added LNET self test (landing b_self_test).
[fs/lustre-release.git] / lnet / selftest / framework.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Authors: Isaac Huang <isaac@clusterfs.com>
6  *            Liang Zhen  <liangzhen@clusterfs.com>
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include <libcfs/kp30.h>
12 #include <libcfs/libcfs.h>
13 #include <lnet/lib-lnet.h>
14
15 #include "selftest.h"
16
17
18 static int session_timeout = 100;
19 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
20                 "test session timeout in seconds (100 by default, 0 == never)");
21
22 #define SFW_TEST_CONCURRENCY     128
23 #define SFW_TEST_RPC_TIMEOUT     64
24 #define SFW_CLIENT_RPC_TIMEOUT   64  /* in seconds */
25 #define SFW_EXTRA_TEST_BUFFERS   8 /* tolerate buggy peers with extra buffers */
26
27 #define sfw_test_buffers(tsi)    ((tsi)->tsi_loop + SFW_EXTRA_TEST_BUFFERS)
28
29 #define sfw_unpack_id(id)               \
30 do {                                    \
31         __swab64s(&(id).nid);           \
32         __swab32s(&(id).pid);           \
33 } while (0)
34
35 #define sfw_unpack_sid(sid)             \
36 do {                                    \
37         __swab64s(&(sid).ses_nid);      \
38         __swab64s(&(sid).ses_stamp);    \
39 } while (0)
40
41 #define sfw_unpack_fw_counters(fc)        \
42 do {                                      \
43         __swab32s(&(fc).brw_errors);      \
44         __swab32s(&(fc).active_tests);    \
45         __swab32s(&(fc).active_batches);  \
46         __swab32s(&(fc).zombie_sessions); \
47 } while (0)
48
49 #define sfw_unpack_rpc_counters(rc)     \
50 do {                                    \
51         __swab32s(&(rc).errors);        \
52         __swab32s(&(rc).rpcs_sent);     \
53         __swab32s(&(rc).rpcs_rcvd);     \
54         __swab32s(&(rc).rpcs_dropped);  \
55         __swab32s(&(rc).rpcs_expired);  \
56         __swab64s(&(rc).bulk_get);      \
57         __swab64s(&(rc).bulk_put);      \
58 } while (0)
59
60 #define sfw_unpack_lnet_counters(lc)    \
61 do {                                    \
62         __swab32s(&(lc).errors);        \
63         __swab32s(&(lc).msgs_max);      \
64         __swab32s(&(lc).msgs_alloc);    \
65         __swab32s(&(lc).send_count);    \
66         __swab32s(&(lc).recv_count);    \
67         __swab32s(&(lc).drop_count);    \
68         __swab32s(&(lc).route_count);   \
69         __swab64s(&(lc).send_length);   \
70         __swab64s(&(lc).recv_length);   \
71         __swab64s(&(lc).drop_length);   \
72         __swab64s(&(lc).route_length);  \
73 } while (0)
74
75 #define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive) != 0)
76 #define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive) != 0)
77
78 struct smoketest_framework {
79         struct list_head   fw_zombie_rpcs;     /* RPCs to be recycled */
80         struct list_head   fw_zombie_sessions; /* stopping sessions */
81         struct list_head   fw_tests;           /* registered test cases */
82         atomic_t           fw_nzombies;        /* # zombie sessions */
83         spinlock_t         fw_lock;            /* serialise */
84         sfw_session_t     *fw_session;         /* _the_ session */
85         int                fw_shuttingdown;    /* shutdown in progress */
86         srpc_server_rpc_t *fw_active_srpc;     /* running RPC */
87 } sfw_data;
88
89 /* forward ref's */
90 int sfw_stop_batch (sfw_batch_t *tsb, int force);
91 void sfw_destroy_session (sfw_session_t *sn);
92
93 static inline sfw_test_case_t *
94 sfw_find_test_case(int id)
95 {
96         sfw_test_case_t *tsc;
97
98         LASSERT (id <= SRPC_SERVICE_MAX_ID);
99         LASSERT (id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
100
101         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
102                 if (tsc->tsc_srv_service->sv_id == id)
103                         return tsc;
104         }
105
106         return NULL;
107 }
108
109 static int
110 sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops)
111 {
112         sfw_test_case_t *tsc;
113
114         if (sfw_find_test_case(service->sv_id) != NULL) {
115                 CERROR ("Failed to register test %s (%d)\n",
116                         service->sv_name, service->sv_id);
117                 return -EEXIST;
118         }
119
120         LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
121         if (tsc == NULL)
122                 return -ENOMEM;
123
124         memset(tsc, 0, sizeof(sfw_test_case_t));
125         tsc->tsc_cli_ops     = cliops;
126         tsc->tsc_srv_service = service;
127
128         list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
129         return 0;
130 }
131
132 void
133 sfw_add_session_timer (void)
134 {
135         sfw_session_t *sn = sfw_data.fw_session;
136         stt_timer_t   *timer = &sn->sn_timer;
137
138         LASSERT (!sfw_data.fw_shuttingdown);
139
140         if (sn == NULL || sn->sn_timeout == 0)
141                 return;
142
143         LASSERT (!sn->sn_timer_active);
144
145         sn->sn_timer_active = 1;
146         timer->stt_expires = cfs_time_add(sn->sn_timeout,
147                                           cfs_time_current_sec());
148         stt_add_timer(timer);
149         return;
150 }
151
152 int
153 sfw_del_session_timer (void)
154 {
155         sfw_session_t *sn = sfw_data.fw_session;
156
157         if (sn == NULL || !sn->sn_timer_active)
158                 return 0;
159
160         LASSERT (sn->sn_timeout != 0);
161
162         if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
163                 sn->sn_timer_active = 0;
164                 return 0;
165         }
166
167 #ifndef __KERNEL__
168         /* Racing is impossible in single-threaded userland selftest */
169         LBUG();
170 #endif
171         return EBUSY; /* racing with sfw_session_expired() */
172 }
173
174 /* called with sfw_data.fw_lock held */
175 static void
176 sfw_deactivate_session (void)
177 {
178         sfw_session_t *sn = sfw_data.fw_session;
179         int            nactive = 0;
180         sfw_batch_t   *tsb;
181
182         if (sn == NULL)
183                 return;
184
185         LASSERT (!sn->sn_timer_active);
186
187         sfw_data.fw_session = NULL;
188         atomic_inc(&sfw_data.fw_nzombies);
189         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
190
191         list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
192                 if (sfw_batch_active(tsb)) {
193                         nactive++;
194                         sfw_stop_batch(tsb, 1);
195                 }
196         }
197
198         if (nactive != 0)
199                 return;   /* wait for active batches to stop */
200
201         list_del_init(&sn->sn_list);
202         spin_unlock(&sfw_data.fw_lock);
203
204         sfw_destroy_session(sn);
205
206         spin_lock(&sfw_data.fw_lock);
207         return;
208 }
209
210 #ifndef __KERNEL__
211
212 int
213 sfw_session_removed (void)
214 {
215         return (sfw_data.fw_session == NULL) ? 1 : 0;
216 }
217
218 #endif
219
220 void
221 sfw_session_expired (void *data)
222 {
223         sfw_session_t *sn = data;
224
225         spin_lock(&sfw_data.fw_lock);
226
227         LASSERT (sn->sn_timer_active);
228         LASSERT (sn == sfw_data.fw_session);
229
230         CWARN ("Session expired! sid: %s-"LPU64", name: %s\n",
231                libcfs_nid2str(sn->sn_id.ses_nid),
232                sn->sn_id.ses_stamp, &sn->sn_name[0]);
233
234         sn->sn_timer_active = 0;
235         sfw_deactivate_session();
236
237         spin_unlock(&sfw_data.fw_lock);
238         return;
239 }
240
241 static inline void
242 sfw_init_session (sfw_session_t *sn, lst_sid_t sid, const char *name)
243 {
244         stt_timer_t *timer = &sn->sn_timer;
245
246         memset(sn, 0, sizeof(sfw_session_t));
247         CFS_INIT_LIST_HEAD(&sn->sn_list);
248         CFS_INIT_LIST_HEAD(&sn->sn_batches);
249         atomic_set(&sn->sn_brw_errors, 0);
250         strncpy(&sn->sn_name[0], name, LST_NAME_SIZE);
251
252         sn->sn_timer_active = 0;
253         sn->sn_id           = sid;
254         sn->sn_timeout      = session_timeout;
255
256         timer->stt_data = sn;
257         timer->stt_func = sfw_session_expired;
258         CFS_INIT_LIST_HEAD(&timer->stt_list);
259 }
260
261 /* completion handler for incoming framework RPCs */
262 void
263 sfw_server_rpc_done (srpc_server_rpc_t *rpc)
264 {
265         srpc_service_t *sv = rpc->srpc_service;
266         int             status = rpc->srpc_status;
267
268         CDEBUG (D_NET,
269                 "Incoming framework RPC done: "
270                 "service %s, peer %s, status %s:%d\n",
271                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
272                 swi_state2str(rpc->srpc_wi.wi_state),
273                 status);
274
275         if (rpc->srpc_bulk != NULL)
276                 sfw_free_pages(rpc);
277         return;
278 }
279
280 void
281 sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
282 {
283         LASSERT (rpc->crpc_bulk.bk_niov == 0);
284         LASSERT (list_empty(&rpc->crpc_list));
285         LASSERT (atomic_read(&rpc->crpc_refcount) == 0);
286 #ifndef __KERNEL__
287         LASSERT (rpc->crpc_bulk.bk_pages == NULL);
288 #endif
289
290         CDEBUG (D_NET,
291                 "Outgoing framework RPC done: "
292                 "service %d, peer %s, status %s:%d:%d\n",
293                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
294                 swi_state2str(rpc->crpc_wi.wi_state),
295                 rpc->crpc_aborted, rpc->crpc_status);
296
297         spin_lock(&sfw_data.fw_lock);
298
299         /* my callers must finish all RPCs before shutting me down */
300         LASSERT (!sfw_data.fw_shuttingdown);
301         list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
302
303         spin_unlock(&sfw_data.fw_lock);
304         return;
305 }
306
307 sfw_batch_t *
308 sfw_find_batch (lst_bid_t bid)
309 {
310         sfw_session_t *sn = sfw_data.fw_session;
311         sfw_batch_t   *bat;
312
313         LASSERT (sn != NULL);
314
315         list_for_each_entry (bat, &sn->sn_batches, bat_list) {
316                 if (bat->bat_id.bat_id == bid.bat_id)
317                         return bat;
318         }
319
320         return NULL;
321 }
322
323 sfw_batch_t *
324 sfw_bid2batch (lst_bid_t bid)
325 {
326         sfw_session_t *sn = sfw_data.fw_session;
327         sfw_batch_t   *bat;
328
329         LASSERT (sn != NULL);
330
331         bat = sfw_find_batch(bid);
332         if (bat != NULL)
333                 return bat;
334
335         LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
336         if (bat == NULL) 
337                 return NULL;
338
339         bat->bat_error    = 0;
340         bat->bat_session  = sn;
341         bat->bat_id       = bid;
342         atomic_set(&bat->bat_nactive, 0);
343         CFS_INIT_LIST_HEAD(&bat->bat_tests);
344
345         list_add_tail(&bat->bat_list, &sn->sn_batches);
346         return bat;
347 }
348
349 int
350 sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
351 {
352         sfw_session_t  *sn = sfw_data.fw_session;
353         sfw_counters_t *cnt = &reply->str_fw;
354         sfw_batch_t    *bat;
355
356         reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
357
358         if (request->str_sid.ses_nid == LNET_NID_ANY) {
359                 reply->str_status = EINVAL;
360                 return 0;
361         }
362
363         if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
364                 reply->str_status = ESRCH;
365                 return 0;
366         }
367
368         LNET_LOCK();
369         reply->str_lnet = the_lnet.ln_counters;
370         LNET_UNLOCK();
371
372         srpc_get_counters(&reply->str_rpc);
373
374         cnt->brw_errors      = atomic_read(&sn->sn_brw_errors);
375         cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
376
377         cnt->active_tests = cnt->active_batches = 0;
378         list_for_each_entry (bat, &sn->sn_batches, bat_list) {
379                 int n = atomic_read(&bat->bat_nactive);
380
381                 if (n > 0) {
382                         cnt->active_batches++;
383                         cnt->active_tests += n;
384                 }
385         }
386
387         reply->str_status = 0;
388         return 0;
389 }
390
391 int
392 sfw_make_session (srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
393 {
394         sfw_session_t *sn = sfw_data.fw_session;
395
396         if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
397                 reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
398                 reply->mksn_status = EINVAL;
399                 return 0;
400         }
401
402         if (sn != NULL && !request->mksn_force) {
403                 reply->mksn_sid    = sn->sn_id;
404                 reply->mksn_status = EBUSY;
405                 strncpy(&reply->mksn_name[0], &sn->sn_name[0], LST_NAME_SIZE);
406                 return 0;
407         }
408         
409         LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
410         if (sn == NULL) {
411                 CERROR ("Dropping RPC (mksn) under memory pressure.\n");
412                 return -ENOMEM;
413         }
414
415         sfw_init_session(sn, request->mksn_sid, &request->mksn_name[0]);
416
417         spin_lock(&sfw_data.fw_lock);
418
419         sfw_deactivate_session();
420         LASSERT (sfw_data.fw_session == NULL);
421         sfw_data.fw_session = sn;
422
423         spin_unlock(&sfw_data.fw_lock);
424
425         reply->mksn_status  = 0;
426         reply->mksn_sid     = sn->sn_id;
427         reply->mksn_timeout = sn->sn_timeout;
428         return 0;
429 }
430
431 int
432 sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
433 {
434         sfw_session_t *sn = sfw_data.fw_session;
435
436         reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
437
438         if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
439                 reply->rmsn_status = EINVAL;
440                 return 0;
441         }
442
443         if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
444                 reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
445                 return 0;
446         }
447
448         spin_lock(&sfw_data.fw_lock);
449         sfw_deactivate_session();
450         spin_unlock(&sfw_data.fw_lock);
451
452         reply->rmsn_status = 0;
453         reply->rmsn_sid    = LST_INVALID_SID;
454         LASSERT (sfw_data.fw_session == NULL);
455         return 0;
456 }
457
458 int
459 sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
460 {
461         sfw_session_t *sn = sfw_data.fw_session;
462
463         if (sn == NULL) {
464                 reply->dbg_status = ESRCH;
465                 reply->dbg_sid    = LST_INVALID_SID;
466                 return 0;
467         } 
468
469         reply->dbg_status  = 0;
470         reply->dbg_sid     = sn->sn_id;      
471         reply->dbg_timeout = sn->sn_timeout;
472         strncpy(reply->dbg_name, &sn->sn_name[0], LST_NAME_SIZE);
473
474         return 0;
475 }
476
477 void
478 sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
479 {
480         sfw_test_unit_t     *tsu = rpc->crpc_priv;
481         sfw_test_instance_t *tsi = tsu->tsu_instance;
482
483         /* Called with hold of tsi->tsi_lock */
484         LASSERT (list_empty(&rpc->crpc_list));
485         list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
486 }
487
488 int
489 sfw_load_test (sfw_test_instance_t *tsi)
490 {
491         sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
492         int              nrequired = sfw_test_buffers(tsi);
493         int              nposted;
494
495         LASSERT (tsc != NULL);
496
497         if (tsi->tsi_is_client) {
498                 tsi->tsi_ops = tsc->tsc_cli_ops;
499                 return 0;
500         }
501
502         nposted = srpc_service_add_buffers(tsc->tsc_srv_service, nrequired);
503         if (nposted != nrequired) {
504                 CWARN ("Failed to reserve enough buffers: "
505                        "service %s, %d needed, %d reserved\n",
506                        tsc->tsc_srv_service->sv_name, nrequired, nposted);
507                 srpc_service_remove_buffers(tsc->tsc_srv_service, nposted);
508                 return -ENOMEM;
509         }
510
511         CDEBUG (D_NET, "Reserved %d buffers for test %s\n",
512                 nposted, tsc->tsc_srv_service->sv_name);
513         return 0;
514 }
515
516 void
517 sfw_unload_test (sfw_test_instance_t *tsi)
518 {
519         sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
520
521         LASSERT (tsc != NULL);
522
523         if (!tsi->tsi_is_client)
524                 srpc_service_remove_buffers(tsc->tsc_srv_service,
525                                             sfw_test_buffers(tsi));
526         return;
527 }
528
529 void
530 sfw_destroy_test_instance (sfw_test_instance_t *tsi)
531 {
532         srpc_client_rpc_t *rpc;
533         sfw_test_unit_t   *tsu;
534
535         if (!tsi->tsi_is_client)
536                 goto clean;
537
538         tsi->tsi_ops->tso_fini(tsi);
539
540         LASSERT (!tsi->tsi_stopping);
541         LASSERT (list_empty(&tsi->tsi_active_rpcs));
542         LASSERT (!sfw_test_active(tsi));
543
544         while (!list_empty(&tsi->tsi_units)) {
545                 tsu = list_entry(tsi->tsi_units.next,
546                                  sfw_test_unit_t, tsu_list);
547                 list_del(&tsu->tsu_list);
548                 LIBCFS_FREE(tsu, sizeof(*tsu));
549         }
550
551         while (!list_empty(&tsi->tsi_free_rpcs)) {
552                 rpc = list_entry(tsi->tsi_free_rpcs.next,
553                                  srpc_client_rpc_t, crpc_list);
554                 list_del(&rpc->crpc_list);
555                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
556         }
557
558 clean:
559         sfw_unload_test(tsi);
560         LIBCFS_FREE(tsi, sizeof(*tsi));
561         return;
562 }
563
564 void
565 sfw_destroy_batch (sfw_batch_t *tsb)
566 {
567         sfw_test_instance_t *tsi;
568
569         LASSERT (!sfw_batch_active(tsb));
570         LASSERT (list_empty(&tsb->bat_list));
571
572         while (!list_empty(&tsb->bat_tests)) {
573                 tsi = list_entry(tsb->bat_tests.next,
574                                  sfw_test_instance_t, tsi_list);
575                 list_del_init(&tsi->tsi_list);
576                 sfw_destroy_test_instance(tsi);
577         }
578
579         LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
580         return;
581 }
582
583 void
584 sfw_destroy_session (sfw_session_t *sn)
585 {
586         sfw_batch_t *batch;
587
588         LASSERT (list_empty(&sn->sn_list));
589         LASSERT (sn != sfw_data.fw_session);
590
591         while (!list_empty(&sn->sn_batches)) {
592                 batch = list_entry(sn->sn_batches.next,
593                                    sfw_batch_t, bat_list);
594                 list_del_init(&batch->bat_list);
595                 sfw_destroy_batch(batch);
596         }
597
598         LIBCFS_FREE(sn, sizeof(*sn));
599         atomic_dec(&sfw_data.fw_nzombies);
600         return;
601 }
602
603 void
604 sfw_unpack_test_req (srpc_msg_t *msg)
605 {
606         srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
607
608         LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST);
609         LASSERT (req->tsr_is_client);
610
611         if (msg->msg_magic == SRPC_MSG_MAGIC)
612                 return; /* no flipping needed */
613
614         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
615
616         if (req->tsr_service == SRPC_SERVICE_BRW) {
617                 test_bulk_req_t *bulk = &req->tsr_u.bulk;
618
619                 __swab32s(&bulk->blk_opc);
620                 __swab32s(&bulk->blk_npg);
621                 __swab32s(&bulk->blk_flags);
622                 return;
623         }
624
625         if (req->tsr_service == SRPC_SERVICE_PING) {
626                 test_ping_req_t *ping = &req->tsr_u.ping;
627
628                 __swab32s(&ping->png_size);
629                 __swab32s(&ping->png_flags);
630                 return;
631         }
632
633         LBUG ();
634         return;
635 }
636
637 int
638 sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
639 {
640         srpc_msg_t          *msg = &rpc->srpc_reqstbuf->buf_msg;
641         srpc_test_reqst_t   *req = &msg->msg_body.tes_reqst;
642         srpc_bulk_t         *bk = rpc->srpc_bulk;
643         int                  ndest = req->tsr_ndest;
644         sfw_test_unit_t     *tsu;
645         sfw_test_instance_t *tsi;
646         int                  i;
647         int                  rc;
648
649         LIBCFS_ALLOC(tsi, sizeof(*tsi));
650         if (tsi == NULL) {
651                 CERROR ("Can't allocate test instance for batch: "LPU64"\n",
652                         tsb->bat_id.bat_id);
653                 return -ENOMEM;
654         }
655
656         memset(tsi, 0, sizeof(*tsi));
657         spin_lock_init(&tsi->tsi_lock);
658         atomic_set(&tsi->tsi_nactive, 0);
659         CFS_INIT_LIST_HEAD(&tsi->tsi_units);
660         CFS_INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
661         CFS_INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
662
663         tsi->tsi_stopping   = 0;
664         tsi->tsi_batch      = tsb;
665         tsi->tsi_loop       = req->tsr_loop;
666         tsi->tsi_concur     = req->tsr_concur;
667         tsi->tsi_service    = req->tsr_service;
668         tsi->tsi_is_client  = !!(req->tsr_is_client);
669         tsi->tsi_stop_onerr = !!(req->tsr_stop_onerr);
670
671         rc = sfw_load_test(tsi);
672         if (rc != 0) {
673                 LIBCFS_FREE(tsi, sizeof(*tsi));
674                 return rc;
675         }
676
677         LASSERT (!sfw_batch_active(tsb));
678
679         if (!tsi->tsi_is_client) {
680                 /* it's test server, just add it to tsb */
681                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
682                 return 0;
683         }
684
685         LASSERT (bk != NULL);
686 #ifndef __KERNEL__
687         LASSERT (bk->bk_pages != NULL);
688 #endif
689         LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= ndest);
690         LASSERT (bk->bk_len >= sizeof(lnet_process_id_t) * ndest);
691
692         sfw_unpack_test_req(msg);
693         memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
694
695         for (i = 0; i < ndest; i++) {
696                 lnet_process_id_t *dests;
697                 lnet_process_id_t  id;
698                 int                j;
699
700 #ifdef __KERNEL__
701                 dests = cfs_page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
702                 LASSERT (dests != NULL);  /* my pages are within KVM always */
703 #else
704                 dests = cfs_page_address(bk->bk_pages[i / SFW_ID_PER_PAGE]);
705 #endif
706                 id = dests[i % SFW_ID_PER_PAGE];
707                 if (msg->msg_magic != SRPC_MSG_MAGIC)
708                         sfw_unpack_id(id);
709
710                 for (j = 0; j < tsi->tsi_concur; j++) {
711                         LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
712                         if (tsu == NULL) {
713                                 rc = -ENOMEM;
714                                 CERROR ("Can't allocate tsu for %d\n",
715                                         tsi->tsi_service);
716                                 goto error;
717                         }
718
719                         tsu->tsu_dest     = id;
720                         tsu->tsu_instance = tsi;
721                         tsu->tsu_private  = NULL;
722                         list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
723                 }
724         }
725
726         rc = tsi->tsi_ops->tso_init(tsi);
727         if (rc == 0) {
728                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
729                 return 0;
730         }
731
732 error:
733         LASSERT (rc != 0);
734         sfw_destroy_test_instance(tsi);
735         return rc;
736 }
737
738 static void
739 sfw_test_unit_done (sfw_test_unit_t *tsu)
740 {
741         sfw_test_instance_t *tsi = tsu->tsu_instance;
742         sfw_batch_t         *tsb = tsi->tsi_batch;
743         sfw_session_t       *sn = tsb->bat_session;
744
745         LASSERT (sfw_test_active(tsi));
746
747         if (!atomic_dec_and_test(&tsi->tsi_nactive))
748                 return;
749         
750         /* the test instance is done */
751         spin_lock(&tsi->tsi_lock);
752
753         tsi->tsi_stopping = 0;
754
755         spin_unlock(&tsi->tsi_lock);
756
757         spin_lock(&sfw_data.fw_lock);
758
759         if (!atomic_dec_and_test(&tsb->bat_nactive) || /* tsb still active */
760             sn == sfw_data.fw_session) {               /* sn also active */
761                 spin_unlock(&sfw_data.fw_lock);
762                 return;
763         }
764         
765         LASSERT (!list_empty(&sn->sn_list)); /* I'm a zombie! */
766
767         list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
768                 if (sfw_batch_active(tsb)) {
769                         spin_unlock(&sfw_data.fw_lock);
770                         return;
771                 }
772         }
773
774         list_del_init(&sn->sn_list);
775         spin_unlock(&sfw_data.fw_lock);
776
777         sfw_destroy_session(sn);
778         return;
779 }
780
781 void
782 sfw_test_rpc_done (srpc_client_rpc_t *rpc)
783 {
784         sfw_test_unit_t     *tsu = rpc->crpc_priv;
785         sfw_test_instance_t *tsi = tsu->tsu_instance;
786         int                  done = 0;
787
788         if (rpc->crpc_status != 0 && tsu->tsu_error == 0 &&
789             (rpc->crpc_status != -EINTR || !tsi->tsi_stopping))
790                 tsu->tsu_error = rpc->crpc_status;
791
792         tsi->tsi_ops->tso_done_rpc(tsu, rpc);
793                       
794         spin_lock(&tsi->tsi_lock);
795
796         LASSERT (sfw_test_active(tsi));
797         LASSERT (!list_empty(&rpc->crpc_list));
798
799         list_del_init(&rpc->crpc_list);
800
801         /* batch is stopping or loop is done or get error */
802         if (tsi->tsi_stopping ||
803             tsu->tsu_loop == 0 ||
804             (tsu->tsu_error != 0 && tsi->tsi_stop_onerr))
805                 done = 1;
806
807         /* dec ref for poster */
808         srpc_client_rpc_decref(rpc);
809
810         spin_unlock(&tsi->tsi_lock);
811
812         if (!done) {
813                 swi_schedule_workitem(&tsu->tsu_worker);
814                 return;
815         }
816
817         sfw_test_unit_done(tsu);
818         return;
819 }
820
821 int
822 sfw_create_test_rpc (sfw_test_unit_t *tsu, lnet_process_id_t peer,
823                      int nblk, int blklen, srpc_client_rpc_t **rpcpp)
824 {
825         srpc_client_rpc_t   *rpc = NULL;
826         sfw_test_instance_t *tsi = tsu->tsu_instance;
827         
828         spin_lock(&tsi->tsi_lock);
829
830         LASSERT (sfw_test_active(tsi));
831
832         if (!list_empty(&tsi->tsi_free_rpcs)) {
833                 /* pick request from buffer */
834                 rpc = list_entry(tsi->tsi_free_rpcs.next,
835                                  srpc_client_rpc_t, crpc_list);
836                 LASSERT (nblk == rpc->crpc_bulk.bk_niov);
837                 list_del_init(&rpc->crpc_list);
838
839                 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
840                                      blklen, sfw_test_rpc_done,
841                                      sfw_test_rpc_fini, tsu);
842         }
843
844         spin_unlock(&tsi->tsi_lock);
845         
846         if (rpc == NULL)
847                 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
848                                              blklen, sfw_test_rpc_done, 
849                                              sfw_test_rpc_fini, tsu);
850         if (rpc == NULL) {
851                 CERROR ("Can't create rpc for test %d\n", tsi->tsi_service);
852                 return -ENOMEM;
853         }
854
855         *rpcpp = rpc;
856         return 0;
857 }
858
859 int
860 sfw_run_test (swi_workitem_t *wi)
861 {
862         sfw_test_unit_t     *tsu = wi->wi_data;
863         sfw_test_instance_t *tsi = tsu->tsu_instance;
864         srpc_client_rpc_t   *rpc = NULL;
865
866         LASSERT (wi == &tsu->tsu_worker);
867
868         tsu->tsu_error = tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc);
869         if (tsu->tsu_error != 0) {
870                 LASSERT (rpc == NULL);
871                 goto test_done;
872         }
873
874         LASSERT (rpc != NULL);
875
876         spin_lock(&tsi->tsi_lock);
877
878         if (tsi->tsi_stopping) {
879                 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
880                 spin_unlock(&tsi->tsi_lock);
881                 goto test_done;
882         }
883
884         if (tsu->tsu_loop > 0)
885                 tsu->tsu_loop--;
886
887         list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
888         spin_unlock(&tsi->tsi_lock);
889
890         rpc->crpc_timeout = SFW_TEST_RPC_TIMEOUT;
891
892         spin_lock(&rpc->crpc_lock);
893         srpc_post_rpc(rpc);
894         spin_unlock(&rpc->crpc_lock);
895         return 0;
896
897 test_done:
898         /*
899          * No one can schedule me now since:
900          * - previous RPC, if any, has done and
901          * - no new RPC is initiated.
902          * - my batch is still active; no one can run it again now.
903          * Cancel pending schedules and prevent future schedule attempts:
904          */
905         swi_kill_workitem(wi);
906         sfw_test_unit_done(tsu);
907         return 1;
908 }
909
910 int
911 sfw_run_batch (sfw_batch_t *tsb)
912 {
913         swi_workitem_t      *wi;
914         sfw_test_unit_t     *tsu;
915         sfw_test_instance_t *tsi;
916
917         if (sfw_batch_active(tsb)) {
918                 CDEBUG (D_NET, "Can't start active batch: "LPU64" (%d)\n",
919                         tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
920                 return -EPERM;
921         }
922
923         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
924                 if (!tsi->tsi_is_client) /* skip server instances */
925                         continue;
926
927                 LASSERT (!tsi->tsi_stopping);
928                 LASSERT (!sfw_test_active(tsi));
929
930                 atomic_inc(&tsb->bat_nactive);
931
932                 list_for_each_entry (tsu, &tsi->tsi_units, tsu_list) {
933                         atomic_inc(&tsi->tsi_nactive);
934
935                         tsu->tsu_error = 0;
936                         tsu->tsu_loop  = tsi->tsi_loop;
937
938                         wi = &tsu->tsu_worker;
939                         swi_init_workitem(wi, tsu, sfw_run_test);
940                         swi_schedule_workitem(wi);
941                 }
942         }
943
944         return 0;
945 }
946
947 int
948 sfw_stop_batch (sfw_batch_t *tsb, int force)
949 {
950         sfw_test_instance_t *tsi;
951         srpc_client_rpc_t   *rpc;
952
953         if (!sfw_batch_active(tsb))
954                 return -EPERM;
955
956         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
957                 spin_lock(&tsi->tsi_lock);
958
959                 if (!tsi->tsi_is_client ||
960                     !sfw_test_active(tsi) || tsi->tsi_stopping) {
961                         spin_unlock(&tsi->tsi_lock);
962                         continue;
963                 }
964
965                 tsi->tsi_stopping = 1;
966
967                 if (!force) {
968                         spin_unlock(&tsi->tsi_lock);
969                         continue;
970                 }
971
972                 /* abort launched rpcs in the test */
973                 list_for_each_entry (rpc, &tsi->tsi_active_rpcs, crpc_list) {
974                         spin_lock(&rpc->crpc_lock);
975
976                         srpc_abort_rpc(rpc, -EINTR);
977
978                         spin_unlock(&rpc->crpc_lock);
979                 }
980
981                 spin_unlock(&tsi->tsi_lock);
982         }
983
984         return 0;
985 }
986
987 int
988 sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
989 {
990         sfw_test_instance_t *tsi;
991
992         if (testidx < 0)
993                 return -EINVAL;
994
995         if (testidx == 0) {
996                 reply->bar_active = atomic_read(&tsb->bat_nactive);
997                 return 0;
998         }
999
1000         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
1001                 if (testidx-- > 1)
1002                         continue;
1003
1004                 reply->bar_active = atomic_read(&tsi->tsi_nactive);
1005                 return 0;
1006         }
1007
1008         return -ENOENT;
1009 }
1010
1011 void
1012 sfw_free_pages (srpc_server_rpc_t *rpc)
1013 {
1014         srpc_free_bulk(rpc->srpc_bulk);
1015         rpc->srpc_bulk = NULL;
1016 }
1017
1018 int
1019 sfw_alloc_pages (srpc_server_rpc_t *rpc, int npages, int sink)
1020 {
1021         LASSERT (rpc->srpc_bulk == NULL);
1022         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
1023
1024         rpc->srpc_bulk = srpc_alloc_bulk(npages, sink);
1025         if (rpc->srpc_bulk == NULL) return -ENOMEM;
1026
1027         return 0;
1028 }
1029
1030 int
1031 sfw_add_test (srpc_server_rpc_t *rpc)
1032 {
1033         sfw_session_t     *sn = sfw_data.fw_session;
1034         srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1035         srpc_test_reqst_t *request;
1036         int                rc;
1037         sfw_batch_t       *bat;
1038
1039         request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1040         reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1041
1042         if (request->tsr_loop == 0 ||
1043             request->tsr_concur == 0 ||
1044             request->tsr_sid.ses_nid == LNET_NID_ANY ||
1045             request->tsr_ndest > SFW_MAX_NDESTS ||
1046             (request->tsr_is_client && request->tsr_ndest == 0) ||
1047             request->tsr_concur > SFW_MAX_CONCUR ||
1048             request->tsr_service > SRPC_SERVICE_MAX_ID ||
1049             request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1050                 reply->tsr_status = EINVAL;
1051                 return 0;
1052         }
1053
1054         if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1055             sfw_find_test_case(request->tsr_service) == NULL) {
1056                 reply->tsr_status = ENOENT;
1057                 return 0;
1058         }
1059
1060         bat = sfw_bid2batch(request->tsr_bid);
1061         if (bat == NULL) {
1062                 CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
1063                         rpc->srpc_service->sv_name,
1064                         libcfs_id2str(rpc->srpc_peer));
1065                 return -ENOMEM;
1066         }
1067
1068         if (sfw_batch_active(bat)) {
1069                 reply->tsr_status = EBUSY;
1070                 return 0;
1071         }
1072
1073         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1074                 /* rpc will be resumed later in sfw_bulk_ready */
1075                 return sfw_alloc_pages(rpc,
1076                                        sfw_id_pages(request->tsr_ndest), 1);
1077         }
1078
1079         rc = sfw_add_test_instance(bat, rpc);
1080         CDEBUG (rc == 0 ? D_NET : D_WARNING,
1081                 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1082                 rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1083                 request->tsr_is_client ? "client" : "server",
1084                 request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1085
1086         reply->tsr_status = (rc < 0) ? -rc : rc;
1087         return 0;
1088 }
1089
1090 int
1091 sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1092 {
1093         sfw_session_t *sn = sfw_data.fw_session;
1094         int            rc = 0;
1095         sfw_batch_t   *bat;
1096
1097         reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1098
1099         if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1100                 reply->bar_status = ESRCH;
1101                 return 0;
1102         }
1103
1104         bat = sfw_find_batch(request->bar_bid);
1105         if (bat == NULL) {
1106                 reply->bar_status = ENOENT;
1107                 return 0;
1108         }
1109
1110         switch (request->bar_opc) {
1111         case SRPC_BATCH_OPC_RUN:
1112                 rc = sfw_run_batch(bat);
1113                 break;
1114
1115         case SRPC_BATCH_OPC_STOP:
1116                 rc = sfw_stop_batch(bat, request->bar_arg);
1117                 break;
1118
1119         case SRPC_BATCH_OPC_QUERY:
1120                 rc = sfw_query_batch(bat, request->bar_testidx, reply);
1121                 break;
1122
1123         default:
1124                 return -EINVAL; /* drop it */
1125         }
1126
1127         reply->bar_status = (rc < 0) ? -rc : rc;
1128         return 0;
1129 }
1130
1131 int
1132 sfw_handle_server_rpc (srpc_server_rpc_t *rpc)
1133 {
1134         srpc_service_t *sv = rpc->srpc_service;
1135         srpc_msg_t     *reply = &rpc->srpc_replymsg;
1136         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
1137         int             rc = 0;
1138
1139         LASSERT (sfw_data.fw_active_srpc == NULL);
1140         LASSERT (sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1141
1142         spin_lock(&sfw_data.fw_lock);
1143
1144         if (sfw_data.fw_shuttingdown) {
1145                 spin_unlock(&sfw_data.fw_lock);
1146                 return -ESHUTDOWN;
1147         }
1148
1149         /* Remove timer to avoid racing with it or expiring active session */
1150         if (sfw_del_session_timer() != 0) {
1151                 CERROR ("Dropping RPC (%s) from %s: racing with expiry timer.",
1152                         sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1153                 spin_unlock(&sfw_data.fw_lock);
1154                 return -EAGAIN;
1155         }
1156
1157         sfw_data.fw_active_srpc = rpc;
1158         spin_unlock(&sfw_data.fw_lock);
1159
1160         sfw_unpack_message(request);
1161         LASSERT (request->msg_type == srpc_service2request(sv->sv_id));
1162
1163         switch(sv->sv_id) {
1164         default:
1165                 LBUG ();
1166         case SRPC_SERVICE_TEST:
1167                 rc = sfw_add_test(rpc);
1168                 break;
1169
1170         case SRPC_SERVICE_BATCH:
1171                 rc = sfw_control_batch(&request->msg_body.bat_reqst,
1172                                        &reply->msg_body.bat_reply);
1173                 break;
1174
1175         case SRPC_SERVICE_QUERY_STAT:
1176                 rc = sfw_get_stats(&request->msg_body.stat_reqst,
1177                                    &reply->msg_body.stat_reply);
1178                 break;
1179
1180         case SRPC_SERVICE_DEBUG:
1181                 rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1182                                        &reply->msg_body.dbg_reply);
1183                 break;
1184
1185         case SRPC_SERVICE_MAKE_SESSION:
1186                 rc = sfw_make_session(&request->msg_body.mksn_reqst,
1187                                       &reply->msg_body.mksn_reply);
1188                 break;
1189
1190         case SRPC_SERVICE_REMOVE_SESSION:
1191                 rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1192                                         &reply->msg_body.rmsn_reply);
1193                 break;
1194         }
1195
1196         rpc->srpc_done = sfw_server_rpc_done;
1197         spin_lock(&sfw_data.fw_lock);
1198
1199 #ifdef __KERNEL__
1200         if (!sfw_data.fw_shuttingdown)
1201                 sfw_add_session_timer();
1202 #else
1203         LASSERT (!sfw_data.fw_shuttingdown);
1204         sfw_add_session_timer();
1205 #endif
1206
1207         sfw_data.fw_active_srpc = NULL;
1208         spin_unlock(&sfw_data.fw_lock);
1209         return rc;
1210 }
1211
1212 int
1213 sfw_bulk_ready (srpc_server_rpc_t *rpc, int status)
1214 {
1215         srpc_service_t *sv = rpc->srpc_service;
1216         int             rc;
1217
1218         LASSERT (rpc->srpc_bulk != NULL);
1219         LASSERT (sv->sv_id == SRPC_SERVICE_TEST);
1220         LASSERT (sfw_data.fw_active_srpc == NULL);
1221         LASSERT (rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1222
1223         spin_lock(&sfw_data.fw_lock);
1224
1225         if (status != 0) {
1226                 CERROR ("Bulk transfer failed for RPC: "
1227                         "service %s, peer %s, status %d\n",
1228                         sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1229                 spin_unlock(&sfw_data.fw_lock);
1230                 return -EIO;
1231         }
1232
1233         if (sfw_data.fw_shuttingdown) {
1234                 spin_unlock(&sfw_data.fw_lock);
1235                 return -ESHUTDOWN;
1236         }
1237
1238         if (sfw_del_session_timer() != 0) {
1239                 CERROR ("Dropping RPC (%s) from %s: racing with expiry timer",
1240                         sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1241                 spin_unlock(&sfw_data.fw_lock);
1242                 return -EAGAIN;
1243         }
1244
1245         sfw_data.fw_active_srpc = rpc;
1246         spin_unlock(&sfw_data.fw_lock);
1247
1248         rc = sfw_add_test(rpc);
1249
1250         spin_lock(&sfw_data.fw_lock);
1251
1252 #ifdef __KERNEL__
1253         if (!sfw_data.fw_shuttingdown)
1254                 sfw_add_session_timer();
1255 #else
1256         LASSERT (!sfw_data.fw_shuttingdown);
1257         sfw_add_session_timer();
1258 #endif
1259
1260         sfw_data.fw_active_srpc = NULL;
1261         spin_unlock(&sfw_data.fw_lock);
1262         return rc;
1263 }
1264
1265 srpc_client_rpc_t *
1266 sfw_create_rpc (lnet_process_id_t peer, int service,
1267                 int nbulkiov, int bulklen,
1268                 void (*done) (srpc_client_rpc_t *), void *priv)
1269 {
1270         srpc_client_rpc_t *rpc;
1271
1272         spin_lock(&sfw_data.fw_lock);
1273
1274         LASSERT (!sfw_data.fw_shuttingdown);
1275         LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1276
1277         if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1278                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1279                                  srpc_client_rpc_t, crpc_list);
1280                 list_del(&rpc->crpc_list);
1281                 spin_unlock(&sfw_data.fw_lock);
1282
1283                 srpc_init_client_rpc(rpc, peer, service, 0, 0,
1284                                      done, sfw_client_rpc_fini, priv);
1285                 return rpc;
1286         }
1287
1288         spin_unlock(&sfw_data.fw_lock);
1289
1290         rpc = srpc_create_client_rpc(peer, service, nbulkiov, bulklen, done,
1291                                      nbulkiov != 0 ? NULL : sfw_client_rpc_fini,
1292                                      priv);
1293         return rpc;
1294 }
1295
1296 void
1297 sfw_unpack_message (srpc_msg_t *msg)
1298 {
1299         if (msg->msg_magic == SRPC_MSG_MAGIC)
1300                 return; /* no flipping needed */
1301
1302         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1303
1304         __swab32s(&msg->msg_type);
1305
1306         if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1307                 srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1308
1309                 __swab32s(&req->str_type);
1310                 __swab64s(&req->str_rpyid);
1311                 sfw_unpack_sid(req->str_sid);
1312                 return;
1313         }
1314
1315         if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1316                 srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1317
1318                 __swab32s(&rep->str_status);
1319                 sfw_unpack_sid(rep->str_sid);
1320                 sfw_unpack_fw_counters(rep->str_fw);
1321                 sfw_unpack_rpc_counters(rep->str_rpc);
1322                 sfw_unpack_lnet_counters(rep->str_lnet);
1323                 return;
1324         }
1325
1326         if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1327                 srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1328
1329                 __swab64s(&req->mksn_rpyid);
1330                 __swab32s(&req->mksn_force);
1331                 sfw_unpack_sid(req->mksn_sid);
1332                 return;
1333         }
1334
1335         if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1336                 srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1337
1338                 __swab32s(&rep->mksn_status);
1339                 __swab32s(&rep->mksn_timeout);
1340                 sfw_unpack_sid(rep->mksn_sid);
1341                 return;
1342         }
1343
1344         if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1345                 srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1346
1347                 __swab64s(&req->rmsn_rpyid);
1348                 sfw_unpack_sid(req->rmsn_sid);
1349                 return;
1350         }
1351
1352         if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1353                 srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1354
1355                 __swab32s(&rep->rmsn_status);
1356                 sfw_unpack_sid(rep->rmsn_sid);
1357                 return;
1358         }
1359
1360         if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1361                 srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1362
1363                 __swab64s(&req->dbg_rpyid);
1364                 __swab32s(&req->dbg_flags);
1365                 sfw_unpack_sid(req->dbg_sid);
1366                 return;
1367         }
1368
1369         if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1370                 srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1371
1372                 __swab32s(&rep->dbg_nbatch);
1373                 __swab32s(&rep->dbg_timeout);
1374                 sfw_unpack_sid(rep->dbg_sid);
1375                 return;
1376         }
1377
1378         if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1379                 srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1380
1381                 __swab32s(&req->bar_opc);
1382                 __swab64s(&req->bar_rpyid);
1383                 __swab32s(&req->bar_testidx);
1384                 __swab32s(&req->bar_arg);
1385                 sfw_unpack_sid(req->bar_sid);
1386                 __swab64s(&req->bar_bid.bat_id);
1387                 return;
1388         }
1389
1390         if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1391                 srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1392
1393                 __swab32s(&rep->bar_status);
1394                 sfw_unpack_sid(rep->bar_sid);
1395                 return;
1396         }
1397
1398         if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1399                 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1400
1401                 __swab64s(&req->tsr_rpyid);
1402                 __swab64s(&req->tsr_bulkid);
1403                 __swab32s(&req->tsr_loop);
1404                 __swab32s(&req->tsr_ndest);
1405                 __swab32s(&req->tsr_concur);
1406                 __swab32s(&req->tsr_service);
1407                 sfw_unpack_sid(req->tsr_sid);
1408                 __swab64s(&req->tsr_bid.bat_id);
1409                 return;
1410         }
1411
1412         if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1413                 srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1414
1415                 __swab32s(&rep->tsr_status);
1416                 sfw_unpack_sid(rep->tsr_sid);
1417                 return;
1418         }
1419
1420         if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1421                 srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1422
1423                 __swab64s(&req->join_rpyid);
1424                 sfw_unpack_sid(req->join_sid);
1425                 return;
1426         }
1427
1428         if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1429                 srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1430
1431                 __swab32s(&rep->join_status);
1432                 __swab32s(&rep->join_timeout);
1433                 sfw_unpack_sid(rep->join_sid);
1434                 return;
1435         }
1436
1437         LBUG ();
1438         return;
1439 }
1440
1441 void
1442 sfw_abort_rpc (srpc_client_rpc_t *rpc)
1443 {
1444         LASSERT (atomic_read(&rpc->crpc_refcount) > 0);
1445         LASSERT (rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1446
1447         spin_lock(&rpc->crpc_lock);
1448         srpc_abort_rpc(rpc, -EINTR);
1449         spin_unlock(&rpc->crpc_lock);
1450         return;
1451 }
1452
1453 void
1454 sfw_post_rpc (srpc_client_rpc_t *rpc)
1455 {
1456         spin_lock(&rpc->crpc_lock);
1457
1458         LASSERT (!rpc->crpc_closed);
1459         LASSERT (!rpc->crpc_aborted);
1460         LASSERT (list_empty(&rpc->crpc_list));
1461         LASSERT (!sfw_data.fw_shuttingdown);
1462
1463         rpc->crpc_timeout = SFW_CLIENT_RPC_TIMEOUT;
1464         srpc_post_rpc(rpc);
1465
1466         spin_unlock(&rpc->crpc_lock);
1467         return;
1468 }
1469
1470 static srpc_service_t sfw_services[] = 
1471 {
1472         {
1473                 .sv_name = "debug",
1474                 .sv_id   = SRPC_SERVICE_DEBUG,
1475         },
1476         {
1477                 .sv_name = "query stats",
1478                 .sv_id   = SRPC_SERVICE_QUERY_STAT,
1479         },
1480         {
1481                 .sv_name = "make sessin",
1482                 .sv_id   = SRPC_SERVICE_MAKE_SESSION,
1483         },
1484         {
1485                 .sv_name = "remove session",
1486                 .sv_id   = SRPC_SERVICE_REMOVE_SESSION,
1487         },
1488         {
1489                 .sv_name = "batch service",
1490                 .sv_id   = SRPC_SERVICE_BATCH,
1491         },
1492         {
1493                 .sv_name = "test service",
1494                 .sv_id   = SRPC_SERVICE_TEST,
1495         },
1496         {       .sv_name = NULL, }
1497 };
1498
1499 extern sfw_test_client_ops_t ping_test_client;
1500 extern srpc_service_t        ping_test_service;
1501
1502 extern sfw_test_client_ops_t brw_test_client;
1503 extern srpc_service_t        brw_test_service;
1504
1505 int
1506 sfw_startup (void)
1507 {
1508         int              i;
1509         int              rc;
1510         int              error;
1511         srpc_service_t  *sv;
1512         sfw_test_case_t *tsc;
1513
1514         if (session_timeout < 0) {
1515                 CERROR ("Session timeout must be non-negative: %d\n",
1516                         session_timeout);
1517                 return -EINVAL;
1518         }
1519
1520         if (session_timeout == 0)
1521                 CWARN ("Zero session_timeout specified "
1522                        "- test sessions never timeout.\n");
1523
1524         memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1525
1526         sfw_data.fw_session     = NULL;
1527         sfw_data.fw_active_srpc = NULL;
1528         spin_lock_init(&sfw_data.fw_lock);
1529         atomic_set(&sfw_data.fw_nzombies, 0);
1530         CFS_INIT_LIST_HEAD(&sfw_data.fw_tests);
1531         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1532         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1533
1534         rc = sfw_register_test(&brw_test_service, &brw_test_client);
1535         LASSERT (rc == 0);
1536         rc = sfw_register_test(&ping_test_service, &ping_test_client);
1537         LASSERT (rc == 0);
1538
1539         error = 0;
1540         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1541                 sv = tsc->tsc_srv_service;
1542                 sv->sv_concur = SFW_TEST_CONCURRENCY;
1543
1544                 rc = srpc_add_service(sv);
1545                 LASSERT (rc != -EBUSY);
1546                 if (rc != 0) {
1547                         CWARN ("Failed to add %s service: %d\n",
1548                                sv->sv_name, rc);
1549                         error = rc;
1550                 }
1551         }
1552
1553         for (i = 0; ; i++) {
1554                 sv = &sfw_services[i];
1555                 if (sv->sv_name == NULL) break;
1556
1557                 sv->sv_bulk_ready = NULL;
1558                 sv->sv_handler    = sfw_handle_server_rpc;
1559                 sv->sv_concur     = SFW_SERVICE_CONCURRENCY;
1560                 if (sv->sv_id == SRPC_SERVICE_TEST)
1561                         sv->sv_bulk_ready = sfw_bulk_ready;
1562
1563                 rc = srpc_add_service(sv);
1564                 LASSERT (rc != -EBUSY);
1565                 if (rc != 0) {
1566                         CWARN ("Failed to add %s service: %d\n",
1567                                sv->sv_name, rc);
1568                         error = rc;
1569                 }
1570
1571                 /* about to sfw_shutdown, no need to add buffer */
1572                 if (error) continue; 
1573
1574                 rc = srpc_service_add_buffers(sv, SFW_POST_BUFFERS);
1575                 if (rc != SFW_POST_BUFFERS) {
1576                         CWARN ("Failed to reserve enough buffers: "
1577                                "service %s, %d needed, %d reserved\n",
1578                                sv->sv_name, SFW_POST_BUFFERS, rc);
1579                         error = -ENOMEM;
1580                 }
1581         }
1582
1583         if (error != 0)
1584                 sfw_shutdown();
1585         return error;
1586 }
1587
1588 void
1589 sfw_shutdown (void)
1590 {
1591         srpc_service_t  *sv;
1592         sfw_test_case_t *tsc;
1593         int              i;
1594
1595         spin_lock(&sfw_data.fw_lock);
1596
1597         sfw_data.fw_shuttingdown = 1;
1598 #ifdef __KERNEL__
1599         lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1600                        "waiting for active RPC to finish.\n");
1601 #else
1602         LASSERT (sfw_data.fw_active_srpc == NULL);
1603 #endif
1604
1605         if (sfw_del_session_timer() != 0)
1606                 lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1607                                "waiting for session timer to explode.\n");
1608
1609         sfw_deactivate_session();
1610         lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0,
1611                        sfw_data.fw_lock,
1612                        "waiting for %d zombie sessions to die.\n",
1613                        atomic_read(&sfw_data.fw_nzombies));
1614
1615         spin_unlock(&sfw_data.fw_lock);
1616
1617         for (i = 0; ; i++) {
1618                 sv = &sfw_services[i];
1619                 if (sv->sv_name == NULL)
1620                         break;
1621
1622                 srpc_shutdown_service(sv);
1623                 srpc_remove_service(sv);
1624         }
1625
1626         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1627                 sv = tsc->tsc_srv_service;
1628                 srpc_shutdown_service(sv);
1629                 srpc_remove_service(sv);
1630         }
1631
1632         while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1633                 srpc_client_rpc_t *rpc;
1634
1635                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next, 
1636                                  srpc_client_rpc_t, crpc_list);
1637                 list_del(&rpc->crpc_list);
1638
1639                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1640         }
1641
1642         for (i = 0; ; i++) {
1643                 sv = &sfw_services[i];
1644                 if (sv->sv_name == NULL)
1645                         break;
1646
1647                 srpc_wait_service_shutdown(sv);
1648         }
1649
1650         while (!list_empty(&sfw_data.fw_tests)) {
1651                 tsc = list_entry(sfw_data.fw_tests.next,
1652                                  sfw_test_case_t, tsc_list);
1653                 
1654                 srpc_wait_service_shutdown(tsc->tsc_srv_service);
1655
1656                 list_del(&tsc->tsc_list);
1657                 LIBCFS_FREE(tsc, sizeof(*tsc));
1658         }
1659
1660         return;
1661 }