Whamcloud - gitweb
i=liangzhen:
[fs/lustre-release.git] / lnet / selftest / framework.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Authors: Isaac Huang <isaac@clusterfs.com>
6  *            Liang Zhen  <liangzhen@clusterfs.com>
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include <libcfs/kp30.h>
12 #include <libcfs/libcfs.h>
13 #include <lnet/lib-lnet.h>
14
15 #include "selftest.h"
16
17
18 int brw_inject_errors = 0;
19 CFS_MODULE_PARM(brw_inject_errors, "i", int, 0644,
20                 "# data errors to inject randomly, zero by default");
21
22 static int session_timeout = 100;
23 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
24                 "test session timeout in seconds (100 by default, 0 == never)");
25
26 #define SFW_TEST_CONCURRENCY     128
27 #define SFW_TEST_RPC_TIMEOUT     64
28 #define SFW_CLIENT_RPC_TIMEOUT   64  /* in seconds */
29 #define SFW_EXTRA_TEST_BUFFERS   8 /* tolerate buggy peers with extra buffers */
30
31 #define sfw_test_buffers(tsi)    ((tsi)->tsi_loop + SFW_EXTRA_TEST_BUFFERS)
32
33 #define sfw_unpack_id(id)               \
34 do {                                    \
35         __swab64s(&(id).nid);           \
36         __swab32s(&(id).pid);           \
37 } while (0)
38
39 #define sfw_unpack_sid(sid)             \
40 do {                                    \
41         __swab64s(&(sid).ses_nid);      \
42         __swab64s(&(sid).ses_stamp);    \
43 } while (0)
44
45 #define sfw_unpack_fw_counters(fc)        \
46 do {                                      \
47         __swab32s(&(fc).brw_errors);      \
48         __swab32s(&(fc).ping_errors);     \
49         __swab32s(&(fc).active_tests);    \
50         __swab32s(&(fc).active_batches);  \
51         __swab32s(&(fc).zombie_sessions); \
52 } while (0)
53
54 #define sfw_unpack_rpc_counters(rc)     \
55 do {                                    \
56         __swab32s(&(rc).errors);        \
57         __swab32s(&(rc).rpcs_sent);     \
58         __swab32s(&(rc).rpcs_rcvd);     \
59         __swab32s(&(rc).rpcs_dropped);  \
60         __swab32s(&(rc).rpcs_expired);  \
61         __swab64s(&(rc).bulk_get);      \
62         __swab64s(&(rc).bulk_put);      \
63 } while (0)
64
65 #define sfw_unpack_lnet_counters(lc)    \
66 do {                                    \
67         __swab32s(&(lc).errors);        \
68         __swab32s(&(lc).msgs_max);      \
69         __swab32s(&(lc).msgs_alloc);    \
70         __swab32s(&(lc).send_count);    \
71         __swab32s(&(lc).recv_count);    \
72         __swab32s(&(lc).drop_count);    \
73         __swab32s(&(lc).route_count);   \
74         __swab64s(&(lc).send_length);   \
75         __swab64s(&(lc).recv_length);   \
76         __swab64s(&(lc).drop_length);   \
77         __swab64s(&(lc).route_length);  \
78 } while (0)
79
80 #define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive) != 0)
81 #define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive) != 0)
82
83 struct smoketest_framework {
84         struct list_head   fw_zombie_rpcs;     /* RPCs to be recycled */
85         struct list_head   fw_zombie_sessions; /* stopping sessions */
86         struct list_head   fw_tests;           /* registered test cases */
87         atomic_t           fw_nzombies;        /* # zombie sessions */
88         spinlock_t         fw_lock;            /* serialise */
89         sfw_session_t     *fw_session;         /* _the_ session */
90         int                fw_shuttingdown;    /* shutdown in progress */
91         srpc_server_rpc_t *fw_active_srpc;     /* running RPC */
92 } sfw_data;
93
94 /* forward ref's */
95 int sfw_stop_batch (sfw_batch_t *tsb, int force);
96 void sfw_destroy_session (sfw_session_t *sn);
97
98 static inline sfw_test_case_t *
99 sfw_find_test_case(int id)
100 {
101         sfw_test_case_t *tsc;
102
103         LASSERT (id <= SRPC_SERVICE_MAX_ID);
104         LASSERT (id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
105
106         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
107                 if (tsc->tsc_srv_service->sv_id == id)
108                         return tsc;
109         }
110
111         return NULL;
112 }
113
114 static int
115 sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops)
116 {
117         sfw_test_case_t *tsc;
118
119         if (sfw_find_test_case(service->sv_id) != NULL) {
120                 CERROR ("Failed to register test %s (%d)\n",
121                         service->sv_name, service->sv_id);
122                 return -EEXIST;
123         }
124
125         LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
126         if (tsc == NULL)
127                 return -ENOMEM;
128
129         memset(tsc, 0, sizeof(sfw_test_case_t));
130         tsc->tsc_cli_ops     = cliops;
131         tsc->tsc_srv_service = service;
132
133         list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
134         return 0;
135 }
136
137 void
138 sfw_add_session_timer (void)
139 {
140         sfw_session_t *sn = sfw_data.fw_session;
141         stt_timer_t   *timer = &sn->sn_timer;
142
143         LASSERT (!sfw_data.fw_shuttingdown);
144
145         if (sn == NULL || sn->sn_timeout == 0)
146                 return;
147
148         LASSERT (!sn->sn_timer_active);
149
150         sn->sn_timer_active = 1;
151         timer->stt_expires = cfs_time_add(sn->sn_timeout,
152                                           cfs_time_current_sec());
153         stt_add_timer(timer);
154         return;
155 }
156
157 int
158 sfw_del_session_timer (void)
159 {
160         sfw_session_t *sn = sfw_data.fw_session;
161
162         if (sn == NULL || !sn->sn_timer_active)
163                 return 0;
164
165         LASSERT (sn->sn_timeout != 0);
166
167         if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
168                 sn->sn_timer_active = 0;
169                 return 0;
170         }
171
172 #ifndef __KERNEL__
173         /* Racing is impossible in single-threaded userland selftest */
174         LBUG();
175 #endif
176         return EBUSY; /* racing with sfw_session_expired() */
177 }
178
179 /* called with sfw_data.fw_lock held */
180 static void
181 sfw_deactivate_session (void)
182 {
183         sfw_session_t *sn = sfw_data.fw_session;
184         int            nactive = 0;
185         sfw_batch_t   *tsb;
186
187         if (sn == NULL) return;
188
189         LASSERT (!sn->sn_timer_active);
190
191         sfw_data.fw_session = NULL;
192         atomic_inc(&sfw_data.fw_nzombies);
193         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
194
195         list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
196                 if (sfw_batch_active(tsb)) {
197                         nactive++;
198                         sfw_stop_batch(tsb, 1);
199                 }
200         }
201
202         if (nactive != 0)
203                 return;   /* wait for active batches to stop */
204
205         list_del_init(&sn->sn_list);
206         spin_unlock(&sfw_data.fw_lock);
207
208         sfw_destroy_session(sn);
209
210         spin_lock(&sfw_data.fw_lock);
211         return;
212 }
213
214 #ifndef __KERNEL__
215
216 int
217 sfw_session_removed (void)
218 {
219         return (sfw_data.fw_session == NULL) ? 1 : 0;
220 }
221
222 #endif
223
224 void
225 sfw_session_expired (void *data)
226 {
227         sfw_session_t *sn = data;
228
229         spin_lock(&sfw_data.fw_lock);
230
231         LASSERT (sn->sn_timer_active);
232         LASSERT (sn == sfw_data.fw_session);
233
234         CWARN ("Session expired! sid: %s-"LPU64", name: %s\n",
235                libcfs_nid2str(sn->sn_id.ses_nid),
236                sn->sn_id.ses_stamp, &sn->sn_name[0]);
237
238         sn->sn_timer_active = 0;
239         sfw_deactivate_session();
240
241         spin_unlock(&sfw_data.fw_lock);
242         return;
243 }
244
245 static inline void
246 sfw_init_session (sfw_session_t *sn, lst_sid_t sid, const char *name)
247 {
248         stt_timer_t *timer = &sn->sn_timer;
249
250         memset(sn, 0, sizeof(sfw_session_t));
251         CFS_INIT_LIST_HEAD(&sn->sn_list);
252         CFS_INIT_LIST_HEAD(&sn->sn_batches);
253         atomic_set(&sn->sn_brw_errors, 0);
254         atomic_set(&sn->sn_ping_errors, 0);
255         strncpy(&sn->sn_name[0], name, LST_NAME_SIZE);
256
257         sn->sn_timer_active = 0;
258         sn->sn_id           = sid;
259         sn->sn_timeout      = session_timeout;
260
261         timer->stt_data = sn;
262         timer->stt_func = sfw_session_expired;
263         CFS_INIT_LIST_HEAD(&timer->stt_list);
264 }
265
266 /* completion handler for incoming framework RPCs */
267 void
268 sfw_server_rpc_done (srpc_server_rpc_t *rpc)
269 {
270         srpc_service_t *sv = rpc->srpc_service;
271         int             status = rpc->srpc_status;
272
273         CDEBUG (D_NET,
274                 "Incoming framework RPC done: "
275                 "service %s, peer %s, status %s:%d\n",
276                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
277                 swi_state2str(rpc->srpc_wi.wi_state),
278                 status);
279
280         if (rpc->srpc_bulk != NULL)
281                 sfw_free_pages(rpc);
282         return;
283 }
284
285 void
286 sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
287 {
288         LASSERT (rpc->crpc_bulk.bk_niov == 0);
289         LASSERT (list_empty(&rpc->crpc_list));
290         LASSERT (atomic_read(&rpc->crpc_refcount) == 0);
291 #ifndef __KERNEL__
292         LASSERT (rpc->crpc_bulk.bk_pages == NULL);
293 #endif
294
295         CDEBUG (D_NET,
296                 "Outgoing framework RPC done: "
297                 "service %d, peer %s, status %s:%d:%d\n",
298                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
299                 swi_state2str(rpc->crpc_wi.wi_state),
300                 rpc->crpc_aborted, rpc->crpc_status);
301
302         spin_lock(&sfw_data.fw_lock);
303
304         /* my callers must finish all RPCs before shutting me down */
305         LASSERT (!sfw_data.fw_shuttingdown);
306         list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
307
308         spin_unlock(&sfw_data.fw_lock);
309         return;
310 }
311
312 sfw_batch_t *
313 sfw_find_batch (lst_bid_t bid)
314 {
315         sfw_session_t *sn = sfw_data.fw_session;
316         sfw_batch_t   *bat;
317
318         LASSERT (sn != NULL);
319
320         list_for_each_entry (bat, &sn->sn_batches, bat_list) {
321                 if (bat->bat_id.bat_id == bid.bat_id)
322                         return bat;
323         }
324
325         return NULL;
326 }
327
328 sfw_batch_t *
329 sfw_bid2batch (lst_bid_t bid)
330 {
331         sfw_session_t *sn = sfw_data.fw_session;
332         sfw_batch_t   *bat;
333
334         LASSERT (sn != NULL);
335
336         bat = sfw_find_batch(bid);
337         if (bat != NULL)
338                 return bat;
339
340         LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
341         if (bat == NULL) 
342                 return NULL;
343
344         bat->bat_error    = 0;
345         bat->bat_session  = sn;
346         bat->bat_id       = bid;
347         atomic_set(&bat->bat_nactive, 0);
348         CFS_INIT_LIST_HEAD(&bat->bat_tests);
349
350         list_add_tail(&bat->bat_list, &sn->sn_batches);
351         return bat;
352 }
353
354 int
355 sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
356 {
357         sfw_session_t  *sn = sfw_data.fw_session;
358         sfw_counters_t *cnt = &reply->str_fw;
359         sfw_batch_t    *bat;
360
361         reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
362
363         if (request->str_sid.ses_nid == LNET_NID_ANY) {
364                 reply->str_status = EINVAL;
365                 return 0;
366         }
367
368         if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
369                 reply->str_status = ESRCH;
370                 return 0;
371         }
372
373         LNET_LOCK();
374         reply->str_lnet = the_lnet.ln_counters;
375         LNET_UNLOCK();
376
377         srpc_get_counters(&reply->str_rpc);
378
379         cnt->brw_errors      = atomic_read(&sn->sn_brw_errors);
380         cnt->ping_errors     = atomic_read(&sn->sn_ping_errors);
381         cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
382
383         cnt->active_tests = cnt->active_batches = 0;
384         list_for_each_entry (bat, &sn->sn_batches, bat_list) {
385                 int n = atomic_read(&bat->bat_nactive);
386
387                 if (n > 0) {
388                         cnt->active_batches++;
389                         cnt->active_tests += n;
390                 }
391         }
392
393         reply->str_status = 0;
394         return 0;
395 }
396
397 int
398 sfw_make_session (srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
399 {
400         sfw_session_t *sn = sfw_data.fw_session;
401
402         if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
403                 reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
404                 reply->mksn_status = EINVAL;
405                 return 0;
406         }
407
408         if (sn != NULL && !request->mksn_force) {
409                 reply->mksn_sid    = sn->sn_id;
410                 reply->mksn_status = EBUSY;
411                 strncpy(&reply->mksn_name[0], &sn->sn_name[0], LST_NAME_SIZE);
412                 return 0;
413         }
414         
415         LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
416         if (sn == NULL) {
417                 CERROR ("Dropping RPC (mksn) under memory pressure.\n");
418                 return -ENOMEM;
419         }
420
421         sfw_init_session(sn, request->mksn_sid, &request->mksn_name[0]);
422
423         spin_lock(&sfw_data.fw_lock);
424
425         sfw_deactivate_session();
426         LASSERT (sfw_data.fw_session == NULL);
427         sfw_data.fw_session = sn;
428
429         spin_unlock(&sfw_data.fw_lock);
430
431         reply->mksn_status  = 0;
432         reply->mksn_sid     = sn->sn_id;
433         reply->mksn_timeout = sn->sn_timeout;
434         return 0;
435 }
436
437 int
438 sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
439 {
440         sfw_session_t *sn = sfw_data.fw_session;
441
442         reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
443
444         if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
445                 reply->rmsn_status = EINVAL;
446                 return 0;
447         }
448
449         if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
450                 reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
451                 return 0;
452         }
453
454         spin_lock(&sfw_data.fw_lock);
455         sfw_deactivate_session();
456         spin_unlock(&sfw_data.fw_lock);
457
458         reply->rmsn_status = 0;
459         reply->rmsn_sid    = LST_INVALID_SID;
460         LASSERT (sfw_data.fw_session == NULL);
461         return 0;
462 }
463
464 int
465 sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
466 {
467         sfw_session_t *sn = sfw_data.fw_session;
468
469         if (sn == NULL) {
470                 reply->dbg_status = ESRCH;
471                 reply->dbg_sid    = LST_INVALID_SID;
472                 return 0;
473         } 
474
475         reply->dbg_status  = 0;
476         reply->dbg_sid     = sn->sn_id;      
477         reply->dbg_timeout = sn->sn_timeout;
478         strncpy(reply->dbg_name, &sn->sn_name[0], LST_NAME_SIZE);
479
480         return 0;
481 }
482
483 void
484 sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
485 {
486         sfw_test_unit_t     *tsu = rpc->crpc_priv;
487         sfw_test_instance_t *tsi = tsu->tsu_instance;
488
489         /* Called with hold of tsi->tsi_lock */
490         LASSERT (list_empty(&rpc->crpc_list));
491         list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
492 }
493
494 int
495 sfw_load_test (sfw_test_instance_t *tsi)
496 {
497         sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
498         int              nrequired = sfw_test_buffers(tsi);
499         int              nposted;
500
501         LASSERT (tsc != NULL);
502
503         if (tsi->tsi_is_client) {
504                 tsi->tsi_ops = tsc->tsc_cli_ops;
505                 return 0;
506         }
507
508         nposted = srpc_service_add_buffers(tsc->tsc_srv_service, nrequired);
509         if (nposted != nrequired) {
510                 CWARN ("Failed to reserve enough buffers: "
511                        "service %s, %d needed, %d reserved\n",
512                        tsc->tsc_srv_service->sv_name, nrequired, nposted);
513                 srpc_service_remove_buffers(tsc->tsc_srv_service, nposted);
514                 return -ENOMEM;
515         }
516
517         CDEBUG (D_NET, "Reserved %d buffers for test %s\n",
518                 nposted, tsc->tsc_srv_service->sv_name);
519         return 0;
520 }
521
522 void
523 sfw_unload_test (sfw_test_instance_t *tsi)
524 {
525         sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
526
527         LASSERT (tsc != NULL);
528
529         if (!tsi->tsi_is_client)
530                 srpc_service_remove_buffers(tsc->tsc_srv_service,
531                                             sfw_test_buffers(tsi));
532         return;
533 }
534
535 void
536 sfw_destroy_test_instance (sfw_test_instance_t *tsi)
537 {
538         srpc_client_rpc_t *rpc;
539         sfw_test_unit_t   *tsu;
540
541         if (!tsi->tsi_is_client) goto clean;
542
543         tsi->tsi_ops->tso_fini(tsi);
544
545         LASSERT (!tsi->tsi_stopping);
546         LASSERT (list_empty(&tsi->tsi_active_rpcs));
547         LASSERT (!sfw_test_active(tsi));
548
549         while (!list_empty(&tsi->tsi_units)) {
550                 tsu = list_entry(tsi->tsi_units.next,
551                                  sfw_test_unit_t, tsu_list);
552                 list_del(&tsu->tsu_list);
553                 LIBCFS_FREE(tsu, sizeof(*tsu));
554         }
555
556         while (!list_empty(&tsi->tsi_free_rpcs)) {
557                 rpc = list_entry(tsi->tsi_free_rpcs.next,
558                                  srpc_client_rpc_t, crpc_list);
559                 list_del(&rpc->crpc_list);
560                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
561         }
562
563 clean:
564         sfw_unload_test(tsi);
565         LIBCFS_FREE(tsi, sizeof(*tsi));
566         return;
567 }
568
569 void
570 sfw_destroy_batch (sfw_batch_t *tsb)
571 {
572         sfw_test_instance_t *tsi;
573
574         LASSERT (!sfw_batch_active(tsb));
575         LASSERT (list_empty(&tsb->bat_list));
576
577         while (!list_empty(&tsb->bat_tests)) {
578                 tsi = list_entry(tsb->bat_tests.next,
579                                  sfw_test_instance_t, tsi_list);
580                 list_del_init(&tsi->tsi_list);
581                 sfw_destroy_test_instance(tsi);
582         }
583
584         LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
585         return;
586 }
587
588 void
589 sfw_destroy_session (sfw_session_t *sn)
590 {
591         sfw_batch_t *batch;
592
593         LASSERT (list_empty(&sn->sn_list));
594         LASSERT (sn != sfw_data.fw_session);
595
596         while (!list_empty(&sn->sn_batches)) {
597                 batch = list_entry(sn->sn_batches.next,
598                                    sfw_batch_t, bat_list);
599                 list_del_init(&batch->bat_list);
600                 sfw_destroy_batch(batch);
601         }
602
603         LIBCFS_FREE(sn, sizeof(*sn));
604         atomic_dec(&sfw_data.fw_nzombies);
605         return;
606 }
607
608 void
609 sfw_unpack_test_req (srpc_msg_t *msg)
610 {
611         srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
612
613         LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST);
614         LASSERT (req->tsr_is_client);
615
616         if (msg->msg_magic == SRPC_MSG_MAGIC)
617                 return; /* no flipping needed */
618
619         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
620
621         if (req->tsr_service == SRPC_SERVICE_BRW) {
622                 test_bulk_req_t *bulk = &req->tsr_u.bulk;
623
624                 __swab32s(&bulk->blk_opc);
625                 __swab32s(&bulk->blk_npg);
626                 __swab32s(&bulk->blk_flags);
627                 return;
628         }
629
630         if (req->tsr_service == SRPC_SERVICE_PING) {
631                 test_ping_req_t *ping = &req->tsr_u.ping;
632
633                 __swab32s(&ping->png_size);
634                 __swab32s(&ping->png_flags);
635                 return;
636         }
637
638         LBUG ();
639         return;
640 }
641
642 int
643 sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
644 {
645         srpc_msg_t          *msg = &rpc->srpc_reqstbuf->buf_msg;
646         srpc_test_reqst_t   *req = &msg->msg_body.tes_reqst;
647         srpc_bulk_t         *bk = rpc->srpc_bulk;
648         int                  ndest = req->tsr_ndest;
649         sfw_test_unit_t     *tsu;
650         sfw_test_instance_t *tsi;
651         int                  i;
652         int                  rc;
653
654         LIBCFS_ALLOC(tsi, sizeof(*tsi));
655         if (tsi == NULL) {
656                 CERROR ("Can't allocate test instance for batch: "LPU64"\n",
657                         tsb->bat_id.bat_id);
658                 return -ENOMEM;
659         }
660
661         memset(tsi, 0, sizeof(*tsi));
662         spin_lock_init(&tsi->tsi_lock);
663         atomic_set(&tsi->tsi_nactive, 0);
664         CFS_INIT_LIST_HEAD(&tsi->tsi_units);
665         CFS_INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
666         CFS_INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
667
668         tsi->tsi_stopping      = 0;
669         tsi->tsi_batch         = tsb;
670         tsi->tsi_loop          = req->tsr_loop;
671         tsi->tsi_concur        = req->tsr_concur;
672         tsi->tsi_service       = req->tsr_service;
673         tsi->tsi_is_client     = !!(req->tsr_is_client);
674         tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
675
676         rc = sfw_load_test(tsi);
677         if (rc != 0) {
678                 LIBCFS_FREE(tsi, sizeof(*tsi));
679                 return rc;
680         }
681
682         LASSERT (!sfw_batch_active(tsb));
683
684         if (!tsi->tsi_is_client) {
685                 /* it's test server, just add it to tsb */
686                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
687                 return 0;
688         }
689
690         LASSERT (bk != NULL);
691 #ifndef __KERNEL__
692         LASSERT (bk->bk_pages != NULL);
693 #endif
694         LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= ndest);
695         LASSERT (bk->bk_len >= sizeof(lnet_process_id_t) * ndest);
696
697         sfw_unpack_test_req(msg);
698         memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
699
700         for (i = 0; i < ndest; i++) {
701                 lnet_process_id_t *dests;
702                 lnet_process_id_t  id;
703                 int                j;
704
705 #ifdef __KERNEL__
706                 dests = cfs_page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
707                 LASSERT (dests != NULL);  /* my pages are within KVM always */
708 #else
709                 dests = cfs_page_address(bk->bk_pages[i / SFW_ID_PER_PAGE]);
710 #endif
711                 id = dests[i % SFW_ID_PER_PAGE];
712                 if (msg->msg_magic != SRPC_MSG_MAGIC)
713                         sfw_unpack_id(id);
714
715                 for (j = 0; j < tsi->tsi_concur; j++) {
716                         LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
717                         if (tsu == NULL) {
718                                 rc = -ENOMEM;
719                                 CERROR ("Can't allocate tsu for %d\n",
720                                         tsi->tsi_service);
721                                 goto error;
722                         }
723
724                         tsu->tsu_dest     = id;
725                         tsu->tsu_instance = tsi;
726                         tsu->tsu_private  = NULL;
727                         list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
728                 }
729         }
730
731         rc = tsi->tsi_ops->tso_init(tsi);
732         if (rc == 0) {
733                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
734                 return 0;
735         }
736
737 error:
738         LASSERT (rc != 0);
739         sfw_destroy_test_instance(tsi);
740         return rc;
741 }
742
743 static void
744 sfw_test_unit_done (sfw_test_unit_t *tsu)
745 {
746         sfw_test_instance_t *tsi = tsu->tsu_instance;
747         sfw_batch_t         *tsb = tsi->tsi_batch;
748         sfw_session_t       *sn = tsb->bat_session;
749
750         LASSERT (sfw_test_active(tsi));
751
752         if (!atomic_dec_and_test(&tsi->tsi_nactive))
753                 return;
754         
755         /* the test instance is done */
756         spin_lock(&tsi->tsi_lock);
757
758         tsi->tsi_stopping = 0;
759
760         spin_unlock(&tsi->tsi_lock);
761
762         spin_lock(&sfw_data.fw_lock);
763
764         if (!atomic_dec_and_test(&tsb->bat_nactive) || /* tsb still active */
765             sn == sfw_data.fw_session) {               /* sn also active */
766                 spin_unlock(&sfw_data.fw_lock);
767                 return;
768         }
769         
770         LASSERT (!list_empty(&sn->sn_list)); /* I'm a zombie! */
771
772         list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
773                 if (sfw_batch_active(tsb)) {
774                         spin_unlock(&sfw_data.fw_lock);
775                         return;
776                 }
777         }
778
779         list_del_init(&sn->sn_list);
780         spin_unlock(&sfw_data.fw_lock);
781
782         sfw_destroy_session(sn);
783         return;
784 }
785
786 void
787 sfw_test_rpc_done (srpc_client_rpc_t *rpc)
788 {
789         sfw_test_unit_t     *tsu = rpc->crpc_priv;
790         sfw_test_instance_t *tsi = tsu->tsu_instance;
791         int                  done = 0;
792
793         tsi->tsi_ops->tso_done_rpc(tsu, rpc);
794                       
795         spin_lock(&tsi->tsi_lock);
796
797         LASSERT (sfw_test_active(tsi));
798         LASSERT (!list_empty(&rpc->crpc_list));
799
800         list_del_init(&rpc->crpc_list);
801
802         /* batch is stopping or loop is done or get error */
803         if (tsi->tsi_stopping ||
804             tsu->tsu_loop == 0 ||
805             (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr))
806                 done = 1;
807
808         /* dec ref for poster */
809         srpc_client_rpc_decref(rpc);
810
811         spin_unlock(&tsi->tsi_lock);
812
813         if (!done) {
814                 swi_schedule_workitem(&tsu->tsu_worker);
815                 return;
816         }
817
818         sfw_test_unit_done(tsu);
819         return;
820 }
821
822 int
823 sfw_create_test_rpc (sfw_test_unit_t *tsu, lnet_process_id_t peer,
824                      int nblk, int blklen, srpc_client_rpc_t **rpcpp)
825 {
826         srpc_client_rpc_t   *rpc = NULL;
827         sfw_test_instance_t *tsi = tsu->tsu_instance;
828         
829         spin_lock(&tsi->tsi_lock);
830
831         LASSERT (sfw_test_active(tsi));
832
833         if (!list_empty(&tsi->tsi_free_rpcs)) {
834                 /* pick request from buffer */
835                 rpc = list_entry(tsi->tsi_free_rpcs.next,
836                                  srpc_client_rpc_t, crpc_list);
837                 LASSERT (nblk == rpc->crpc_bulk.bk_niov);
838                 list_del_init(&rpc->crpc_list);
839
840                 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
841                                      blklen, sfw_test_rpc_done,
842                                      sfw_test_rpc_fini, tsu);
843         }
844
845         spin_unlock(&tsi->tsi_lock);
846         
847         if (rpc == NULL)
848                 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
849                                              blklen, sfw_test_rpc_done, 
850                                              sfw_test_rpc_fini, tsu);
851         if (rpc == NULL) {
852                 CERROR ("Can't create rpc for test %d\n", tsi->tsi_service);
853                 return -ENOMEM;
854         }
855
856         *rpcpp = rpc;
857         return 0;
858 }
859
860 int
861 sfw_run_test (swi_workitem_t *wi)
862 {
863         sfw_test_unit_t     *tsu = wi->wi_data;
864         sfw_test_instance_t *tsi = tsu->tsu_instance;
865         srpc_client_rpc_t   *rpc = NULL;
866
867         LASSERT (wi == &tsu->tsu_worker);
868
869         if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
870                 LASSERT (rpc == NULL);
871                 goto test_done;
872         }
873
874         LASSERT (rpc != NULL);
875
876         spin_lock(&tsi->tsi_lock);
877
878         if (tsi->tsi_stopping) {
879                 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
880                 spin_unlock(&tsi->tsi_lock);
881                 goto test_done;
882         }
883
884         if (tsu->tsu_loop > 0)
885                 tsu->tsu_loop--;
886
887         list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
888         spin_unlock(&tsi->tsi_lock);
889
890         rpc->crpc_timeout = SFW_TEST_RPC_TIMEOUT;
891
892         spin_lock(&rpc->crpc_lock);
893         srpc_post_rpc(rpc);
894         spin_unlock(&rpc->crpc_lock);
895         return 0;
896
897 test_done:
898         /*
899          * No one can schedule me now since:
900          * - previous RPC, if any, has done and
901          * - no new RPC is initiated.
902          * - my batch is still active; no one can run it again now.
903          * Cancel pending schedules and prevent future schedule attempts:
904          */
905         swi_kill_workitem(wi);
906         sfw_test_unit_done(tsu);
907         return 1;
908 }
909
910 int
911 sfw_run_batch (sfw_batch_t *tsb)
912 {
913         swi_workitem_t      *wi;
914         sfw_test_unit_t     *tsu;
915         sfw_test_instance_t *tsi;
916
917         if (sfw_batch_active(tsb)) {
918                 CDEBUG (D_NET, "Can't start active batch: "LPU64" (%d)\n",
919                         tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
920                 return -EPERM;
921         }
922
923         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
924                 if (!tsi->tsi_is_client) /* skip server instances */
925                         continue;
926
927                 LASSERT (!tsi->tsi_stopping);
928                 LASSERT (!sfw_test_active(tsi));
929
930                 atomic_inc(&tsb->bat_nactive);
931
932                 list_for_each_entry (tsu, &tsi->tsi_units, tsu_list) {
933                         atomic_inc(&tsi->tsi_nactive);
934                         tsu->tsu_loop = tsi->tsi_loop;
935                         wi = &tsu->tsu_worker;
936                         swi_init_workitem(wi, tsu, sfw_run_test);
937                         swi_schedule_workitem(wi);
938                 }
939         }
940
941         return 0;
942 }
943
944 int
945 sfw_stop_batch (sfw_batch_t *tsb, int force)
946 {
947         sfw_test_instance_t *tsi;
948         srpc_client_rpc_t   *rpc;
949
950         if (!sfw_batch_active(tsb))
951                 return -EPERM;
952
953         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
954                 spin_lock(&tsi->tsi_lock);
955
956                 if (!tsi->tsi_is_client ||
957                     !sfw_test_active(tsi) || tsi->tsi_stopping) {
958                         spin_unlock(&tsi->tsi_lock);
959                         continue;
960                 }
961
962                 tsi->tsi_stopping = 1;
963
964                 if (!force) {
965                         spin_unlock(&tsi->tsi_lock);
966                         continue;
967                 }
968
969                 /* abort launched rpcs in the test */
970                 list_for_each_entry (rpc, &tsi->tsi_active_rpcs, crpc_list) {
971                         spin_lock(&rpc->crpc_lock);
972
973                         srpc_abort_rpc(rpc, -EINTR);
974
975                         spin_unlock(&rpc->crpc_lock);
976                 }
977
978                 spin_unlock(&tsi->tsi_lock);
979         }
980
981         return 0;
982 }
983
984 int
985 sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
986 {
987         sfw_test_instance_t *tsi;
988
989         if (testidx < 0)
990                 return -EINVAL;
991
992         if (testidx == 0) {
993                 reply->bar_active = atomic_read(&tsb->bat_nactive);
994                 return 0;
995         }
996
997         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
998                 if (testidx-- > 1)
999                         continue;
1000
1001                 reply->bar_active = atomic_read(&tsi->tsi_nactive);
1002                 return 0;
1003         }
1004
1005         return -ENOENT;
1006 }
1007
1008 void
1009 sfw_free_pages (srpc_server_rpc_t *rpc)
1010 {
1011         srpc_free_bulk(rpc->srpc_bulk);
1012         rpc->srpc_bulk = NULL;
1013 }
1014
1015 int
1016 sfw_alloc_pages (srpc_server_rpc_t *rpc, int npages, int sink)
1017 {
1018         LASSERT (rpc->srpc_bulk == NULL);
1019         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
1020
1021         rpc->srpc_bulk = srpc_alloc_bulk(npages, sink);
1022         if (rpc->srpc_bulk == NULL) return -ENOMEM;
1023
1024         return 0;
1025 }
1026
1027 int
1028 sfw_add_test (srpc_server_rpc_t *rpc)
1029 {
1030         sfw_session_t     *sn = sfw_data.fw_session;
1031         srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1032         srpc_test_reqst_t *request;
1033         int                rc;
1034         sfw_batch_t       *bat;
1035
1036         request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1037         reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1038
1039         if (request->tsr_loop == 0 ||
1040             request->tsr_concur == 0 ||
1041             request->tsr_sid.ses_nid == LNET_NID_ANY ||
1042             request->tsr_ndest > SFW_MAX_NDESTS ||
1043             (request->tsr_is_client && request->tsr_ndest == 0) ||
1044             request->tsr_concur > SFW_MAX_CONCUR ||
1045             request->tsr_service > SRPC_SERVICE_MAX_ID ||
1046             request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1047                 reply->tsr_status = EINVAL;
1048                 return 0;
1049         }
1050
1051         if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1052             sfw_find_test_case(request->tsr_service) == NULL) {
1053                 reply->tsr_status = ENOENT;
1054                 return 0;
1055         }
1056
1057         bat = sfw_bid2batch(request->tsr_bid);
1058         if (bat == NULL) {
1059                 CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
1060                         rpc->srpc_service->sv_name,
1061                         libcfs_id2str(rpc->srpc_peer));
1062                 return -ENOMEM;
1063         }
1064
1065         if (sfw_batch_active(bat)) {
1066                 reply->tsr_status = EBUSY;
1067                 return 0;
1068         }
1069
1070         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1071                 /* rpc will be resumed later in sfw_bulk_ready */
1072                 return sfw_alloc_pages(rpc,
1073                                        sfw_id_pages(request->tsr_ndest), 1);
1074         }
1075
1076         rc = sfw_add_test_instance(bat, rpc);
1077         CDEBUG (rc == 0 ? D_NET : D_WARNING,
1078                 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1079                 rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1080                 request->tsr_is_client ? "client" : "server",
1081                 request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1082
1083         reply->tsr_status = (rc < 0) ? -rc : rc;
1084         return 0;
1085 }
1086
1087 int
1088 sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1089 {
1090         sfw_session_t *sn = sfw_data.fw_session;
1091         int            rc = 0;
1092         sfw_batch_t   *bat;
1093
1094         reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1095
1096         if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1097                 reply->bar_status = ESRCH;
1098                 return 0;
1099         }
1100
1101         bat = sfw_find_batch(request->bar_bid);
1102         if (bat == NULL) {
1103                 reply->bar_status = ENOENT;
1104                 return 0;
1105         }
1106
1107         switch (request->bar_opc) {
1108         case SRPC_BATCH_OPC_RUN:
1109                 rc = sfw_run_batch(bat);
1110                 break;
1111
1112         case SRPC_BATCH_OPC_STOP:
1113                 rc = sfw_stop_batch(bat, request->bar_arg);
1114                 break;
1115
1116         case SRPC_BATCH_OPC_QUERY:
1117                 rc = sfw_query_batch(bat, request->bar_testidx, reply);
1118                 break;
1119
1120         default:
1121                 return -EINVAL; /* drop it */
1122         }
1123
1124         reply->bar_status = (rc < 0) ? -rc : rc;
1125         return 0;
1126 }
1127
1128 int
1129 sfw_handle_server_rpc (srpc_server_rpc_t *rpc)
1130 {
1131         srpc_service_t *sv = rpc->srpc_service;
1132         srpc_msg_t     *reply = &rpc->srpc_replymsg;
1133         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
1134         int             rc = 0;
1135
1136         LASSERT (sfw_data.fw_active_srpc == NULL);
1137         LASSERT (sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1138
1139         spin_lock(&sfw_data.fw_lock);
1140
1141         if (sfw_data.fw_shuttingdown) {
1142                 spin_unlock(&sfw_data.fw_lock);
1143                 return -ESHUTDOWN;
1144         }
1145
1146         /* Remove timer to avoid racing with it or expiring active session */
1147         if (sfw_del_session_timer() != 0) {
1148                 CERROR ("Dropping RPC (%s) from %s: racing with expiry timer.",
1149                         sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1150                 spin_unlock(&sfw_data.fw_lock);
1151                 return -EAGAIN;
1152         }
1153
1154         sfw_data.fw_active_srpc = rpc;
1155         spin_unlock(&sfw_data.fw_lock);
1156
1157         sfw_unpack_message(request);
1158         LASSERT (request->msg_type == srpc_service2request(sv->sv_id));
1159
1160         switch(sv->sv_id) {
1161         default:
1162                 LBUG ();
1163         case SRPC_SERVICE_TEST:
1164                 rc = sfw_add_test(rpc);
1165                 break;
1166
1167         case SRPC_SERVICE_BATCH:
1168                 rc = sfw_control_batch(&request->msg_body.bat_reqst,
1169                                        &reply->msg_body.bat_reply);
1170                 break;
1171
1172         case SRPC_SERVICE_QUERY_STAT:
1173                 rc = sfw_get_stats(&request->msg_body.stat_reqst,
1174                                    &reply->msg_body.stat_reply);
1175                 break;
1176
1177         case SRPC_SERVICE_DEBUG:
1178                 rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1179                                        &reply->msg_body.dbg_reply);
1180                 break;
1181
1182         case SRPC_SERVICE_MAKE_SESSION:
1183                 rc = sfw_make_session(&request->msg_body.mksn_reqst,
1184                                       &reply->msg_body.mksn_reply);
1185                 break;
1186
1187         case SRPC_SERVICE_REMOVE_SESSION:
1188                 rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1189                                         &reply->msg_body.rmsn_reply);
1190                 break;
1191         }
1192
1193         rpc->srpc_done = sfw_server_rpc_done;
1194         spin_lock(&sfw_data.fw_lock);
1195
1196 #ifdef __KERNEL__
1197         if (!sfw_data.fw_shuttingdown)
1198                 sfw_add_session_timer();
1199 #else
1200         LASSERT (!sfw_data.fw_shuttingdown);
1201         sfw_add_session_timer();
1202 #endif
1203
1204         sfw_data.fw_active_srpc = NULL;
1205         spin_unlock(&sfw_data.fw_lock);
1206         return rc;
1207 }
1208
1209 int
1210 sfw_bulk_ready (srpc_server_rpc_t *rpc, int status)
1211 {
1212         srpc_service_t *sv = rpc->srpc_service;
1213         int             rc;
1214
1215         LASSERT (rpc->srpc_bulk != NULL);
1216         LASSERT (sv->sv_id == SRPC_SERVICE_TEST);
1217         LASSERT (sfw_data.fw_active_srpc == NULL);
1218         LASSERT (rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1219
1220         spin_lock(&sfw_data.fw_lock);
1221
1222         if (status != 0) {
1223                 CERROR ("Bulk transfer failed for RPC: "
1224                         "service %s, peer %s, status %d\n",
1225                         sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1226                 spin_unlock(&sfw_data.fw_lock);
1227                 return -EIO;
1228         }
1229
1230         if (sfw_data.fw_shuttingdown) {
1231                 spin_unlock(&sfw_data.fw_lock);
1232                 return -ESHUTDOWN;
1233         }
1234
1235         if (sfw_del_session_timer() != 0) {
1236                 CERROR ("Dropping RPC (%s) from %s: racing with expiry timer",
1237                         sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1238                 spin_unlock(&sfw_data.fw_lock);
1239                 return -EAGAIN;
1240         }
1241
1242         sfw_data.fw_active_srpc = rpc;
1243         spin_unlock(&sfw_data.fw_lock);
1244
1245         rc = sfw_add_test(rpc);
1246
1247         spin_lock(&sfw_data.fw_lock);
1248
1249 #ifdef __KERNEL__
1250         if (!sfw_data.fw_shuttingdown)
1251                 sfw_add_session_timer();
1252 #else
1253         LASSERT (!sfw_data.fw_shuttingdown);
1254         sfw_add_session_timer();
1255 #endif
1256
1257         sfw_data.fw_active_srpc = NULL;
1258         spin_unlock(&sfw_data.fw_lock);
1259         return rc;
1260 }
1261
1262 srpc_client_rpc_t *
1263 sfw_create_rpc (lnet_process_id_t peer, int service,
1264                 int nbulkiov, int bulklen,
1265                 void (*done) (srpc_client_rpc_t *), void *priv)
1266 {
1267         srpc_client_rpc_t *rpc;
1268
1269         spin_lock(&sfw_data.fw_lock);
1270
1271         LASSERT (!sfw_data.fw_shuttingdown);
1272         LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1273
1274         if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1275                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1276                                  srpc_client_rpc_t, crpc_list);
1277                 list_del(&rpc->crpc_list);
1278                 spin_unlock(&sfw_data.fw_lock);
1279
1280                 srpc_init_client_rpc(rpc, peer, service, 0, 0,
1281                                      done, sfw_client_rpc_fini, priv);
1282                 return rpc;
1283         }
1284
1285         spin_unlock(&sfw_data.fw_lock);
1286
1287         rpc = srpc_create_client_rpc(peer, service, nbulkiov, bulklen, done,
1288                                      nbulkiov != 0 ? NULL : sfw_client_rpc_fini,
1289                                      priv);
1290         return rpc;
1291 }
1292
1293 void
1294 sfw_unpack_message (srpc_msg_t *msg)
1295 {
1296         if (msg->msg_magic == SRPC_MSG_MAGIC)
1297                 return; /* no flipping needed */
1298
1299         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1300
1301         __swab32s(&msg->msg_type);
1302
1303         if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1304                 srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1305
1306                 __swab32s(&req->str_type);
1307                 __swab64s(&req->str_rpyid);
1308                 sfw_unpack_sid(req->str_sid);
1309                 return;
1310         }
1311
1312         if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1313                 srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1314
1315                 __swab32s(&rep->str_status);
1316                 sfw_unpack_sid(rep->str_sid);
1317                 sfw_unpack_fw_counters(rep->str_fw);
1318                 sfw_unpack_rpc_counters(rep->str_rpc);
1319                 sfw_unpack_lnet_counters(rep->str_lnet);
1320                 return;
1321         }
1322
1323         if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1324                 srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1325
1326                 __swab64s(&req->mksn_rpyid);
1327                 __swab32s(&req->mksn_force);
1328                 sfw_unpack_sid(req->mksn_sid);
1329                 return;
1330         }
1331
1332         if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1333                 srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1334
1335                 __swab32s(&rep->mksn_status);
1336                 __swab32s(&rep->mksn_timeout);
1337                 sfw_unpack_sid(rep->mksn_sid);
1338                 return;
1339         }
1340
1341         if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1342                 srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1343
1344                 __swab64s(&req->rmsn_rpyid);
1345                 sfw_unpack_sid(req->rmsn_sid);
1346                 return;
1347         }
1348
1349         if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1350                 srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1351
1352                 __swab32s(&rep->rmsn_status);
1353                 sfw_unpack_sid(rep->rmsn_sid);
1354                 return;
1355         }
1356
1357         if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1358                 srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1359
1360                 __swab64s(&req->dbg_rpyid);
1361                 __swab32s(&req->dbg_flags);
1362                 sfw_unpack_sid(req->dbg_sid);
1363                 return;
1364         }
1365
1366         if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1367                 srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1368
1369                 __swab32s(&rep->dbg_nbatch);
1370                 __swab32s(&rep->dbg_timeout);
1371                 sfw_unpack_sid(rep->dbg_sid);
1372                 return;
1373         }
1374
1375         if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1376                 srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1377
1378                 __swab32s(&req->bar_opc);
1379                 __swab64s(&req->bar_rpyid);
1380                 __swab32s(&req->bar_testidx);
1381                 __swab32s(&req->bar_arg);
1382                 sfw_unpack_sid(req->bar_sid);
1383                 __swab64s(&req->bar_bid.bat_id);
1384                 return;
1385         }
1386
1387         if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1388                 srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1389
1390                 __swab32s(&rep->bar_status);
1391                 sfw_unpack_sid(rep->bar_sid);
1392                 return;
1393         }
1394
1395         if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1396                 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1397
1398                 __swab64s(&req->tsr_rpyid);
1399                 __swab64s(&req->tsr_bulkid);
1400                 __swab32s(&req->tsr_loop);
1401                 __swab32s(&req->tsr_ndest);
1402                 __swab32s(&req->tsr_concur);
1403                 __swab32s(&req->tsr_service);
1404                 sfw_unpack_sid(req->tsr_sid);
1405                 __swab64s(&req->tsr_bid.bat_id);
1406                 return;
1407         }
1408
1409         if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1410                 srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1411
1412                 __swab32s(&rep->tsr_status);
1413                 sfw_unpack_sid(rep->tsr_sid);
1414                 return;
1415         }
1416
1417         if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1418                 srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1419
1420                 __swab64s(&req->join_rpyid);
1421                 sfw_unpack_sid(req->join_sid);
1422                 return;
1423         }
1424
1425         if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1426                 srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1427
1428                 __swab32s(&rep->join_status);
1429                 __swab32s(&rep->join_timeout);
1430                 sfw_unpack_sid(rep->join_sid);
1431                 return;
1432         }
1433
1434         LBUG ();
1435         return;
1436 }
1437
1438 void
1439 sfw_abort_rpc (srpc_client_rpc_t *rpc)
1440 {
1441         LASSERT (atomic_read(&rpc->crpc_refcount) > 0);
1442         LASSERT (rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1443
1444         spin_lock(&rpc->crpc_lock);
1445         srpc_abort_rpc(rpc, -EINTR);
1446         spin_unlock(&rpc->crpc_lock);
1447         return;
1448 }
1449
1450 void
1451 sfw_post_rpc (srpc_client_rpc_t *rpc)
1452 {
1453         spin_lock(&rpc->crpc_lock);
1454
1455         LASSERT (!rpc->crpc_closed);
1456         LASSERT (!rpc->crpc_aborted);
1457         LASSERT (list_empty(&rpc->crpc_list));
1458         LASSERT (!sfw_data.fw_shuttingdown);
1459
1460         rpc->crpc_timeout = SFW_CLIENT_RPC_TIMEOUT;
1461         srpc_post_rpc(rpc);
1462
1463         spin_unlock(&rpc->crpc_lock);
1464         return;
1465 }
1466
1467 static srpc_service_t sfw_services[] = 
1468 {
1469         {
1470                 .sv_name = "debug",
1471                 .sv_id   = SRPC_SERVICE_DEBUG,
1472         },
1473         {
1474                 .sv_name = "query stats",
1475                 .sv_id   = SRPC_SERVICE_QUERY_STAT,
1476         },
1477         {
1478                 .sv_name = "make sessin",
1479                 .sv_id   = SRPC_SERVICE_MAKE_SESSION,
1480         },
1481         {
1482                 .sv_name = "remove session",
1483                 .sv_id   = SRPC_SERVICE_REMOVE_SESSION,
1484         },
1485         {
1486                 .sv_name = "batch service",
1487                 .sv_id   = SRPC_SERVICE_BATCH,
1488         },
1489         {
1490                 .sv_name = "test service",
1491                 .sv_id   = SRPC_SERVICE_TEST,
1492         },
1493         {       .sv_name = NULL, }
1494 };
1495
1496 extern sfw_test_client_ops_t ping_test_client;
1497 extern srpc_service_t        ping_test_service;
1498
1499 extern sfw_test_client_ops_t brw_test_client;
1500 extern srpc_service_t        brw_test_service;
1501
1502 int
1503 sfw_startup (void)
1504 {
1505         int              i;
1506         int              rc;
1507         int              error;
1508         srpc_service_t  *sv;
1509         sfw_test_case_t *tsc;
1510
1511 #ifndef __KERNEL__
1512         char *s;
1513
1514         s = getenv("SESSION_TIMEOUT");
1515         session_timeout = s != NULL ? atoi(s) : session_timeout;
1516
1517         s = getenv("BRW_INJECT_ERRORS");
1518         brw_inject_errors = s != NULL ? atoi(s) : brw_inject_errors;
1519 #endif
1520
1521         if (session_timeout < 0) {
1522                 CERROR ("Session timeout must be non-negative: %d\n",
1523                         session_timeout);
1524                 return -EINVAL;
1525         }
1526
1527         if (session_timeout == 0)
1528                 CWARN ("Zero session_timeout specified "
1529                        "- test sessions never expire.\n");
1530
1531         memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1532
1533         sfw_data.fw_session     = NULL;
1534         sfw_data.fw_active_srpc = NULL;
1535         spin_lock_init(&sfw_data.fw_lock);
1536         atomic_set(&sfw_data.fw_nzombies, 0);
1537         CFS_INIT_LIST_HEAD(&sfw_data.fw_tests);
1538         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1539         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1540
1541         rc = sfw_register_test(&brw_test_service, &brw_test_client);
1542         LASSERT (rc == 0);
1543         rc = sfw_register_test(&ping_test_service, &ping_test_client);
1544         LASSERT (rc == 0);
1545
1546         error = 0;
1547         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1548                 sv = tsc->tsc_srv_service;
1549                 sv->sv_concur = SFW_TEST_CONCURRENCY;
1550
1551                 rc = srpc_add_service(sv);
1552                 LASSERT (rc != -EBUSY);
1553                 if (rc != 0) {
1554                         CWARN ("Failed to add %s service: %d\n",
1555                                sv->sv_name, rc);
1556                         error = rc;
1557                 }
1558         }
1559
1560         for (i = 0; ; i++) {
1561                 sv = &sfw_services[i];
1562                 if (sv->sv_name == NULL) break;
1563
1564                 sv->sv_bulk_ready = NULL;
1565                 sv->sv_handler    = sfw_handle_server_rpc;
1566                 sv->sv_concur     = SFW_SERVICE_CONCURRENCY;
1567                 if (sv->sv_id == SRPC_SERVICE_TEST)
1568                         sv->sv_bulk_ready = sfw_bulk_ready;
1569
1570                 rc = srpc_add_service(sv);
1571                 LASSERT (rc != -EBUSY);
1572                 if (rc != 0) {
1573                         CWARN ("Failed to add %s service: %d\n",
1574                                sv->sv_name, rc);
1575                         error = rc;
1576                 }
1577
1578                 /* about to sfw_shutdown, no need to add buffer */
1579                 if (error) continue; 
1580
1581                 rc = srpc_service_add_buffers(sv, SFW_POST_BUFFERS);
1582                 if (rc != SFW_POST_BUFFERS) {
1583                         CWARN ("Failed to reserve enough buffers: "
1584                                "service %s, %d needed, %d reserved\n",
1585                                sv->sv_name, SFW_POST_BUFFERS, rc);
1586                         error = -ENOMEM;
1587                 }
1588         }
1589
1590         if (error != 0)
1591                 sfw_shutdown();
1592         return error;
1593 }
1594
1595 void
1596 sfw_shutdown (void)
1597 {
1598         srpc_service_t  *sv;
1599         sfw_test_case_t *tsc;
1600         int              i;
1601
1602         spin_lock(&sfw_data.fw_lock);
1603
1604         sfw_data.fw_shuttingdown = 1;
1605 #ifdef __KERNEL__
1606         lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1607                        "waiting for active RPC to finish.\n");
1608 #else
1609         LASSERT (sfw_data.fw_active_srpc == NULL);
1610 #endif
1611
1612         if (sfw_del_session_timer() != 0)
1613                 lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1614                                "waiting for session timer to explode.\n");
1615
1616         sfw_deactivate_session();
1617         lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0,
1618                        sfw_data.fw_lock,
1619                        "waiting for %d zombie sessions to die.\n",
1620                        atomic_read(&sfw_data.fw_nzombies));
1621
1622         spin_unlock(&sfw_data.fw_lock);
1623
1624         for (i = 0; ; i++) {
1625                 sv = &sfw_services[i];
1626                 if (sv->sv_name == NULL)
1627                         break;
1628
1629                 srpc_shutdown_service(sv);
1630                 srpc_remove_service(sv);
1631         }
1632
1633         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1634                 sv = tsc->tsc_srv_service;
1635                 srpc_shutdown_service(sv);
1636                 srpc_remove_service(sv);
1637         }
1638
1639         while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1640                 srpc_client_rpc_t *rpc;
1641
1642                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next, 
1643                                  srpc_client_rpc_t, crpc_list);
1644                 list_del(&rpc->crpc_list);
1645
1646                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1647         }
1648
1649         for (i = 0; ; i++) {
1650                 sv = &sfw_services[i];
1651                 if (sv->sv_name == NULL)
1652                         break;
1653
1654                 srpc_wait_service_shutdown(sv);
1655         }
1656
1657         while (!list_empty(&sfw_data.fw_tests)) {
1658                 tsc = list_entry(sfw_data.fw_tests.next,
1659                                  sfw_test_case_t, tsc_list);
1660                 
1661                 srpc_wait_service_shutdown(tsc->tsc_srv_service);
1662
1663                 list_del(&tsc->tsc_list);
1664                 LIBCFS_FREE(tsc, sizeof(*tsc));
1665         }
1666
1667         return;
1668 }