Whamcloud - gitweb
include liblustre only after libcfs includes for avoid break build.
[fs/lustre-release.git] / lnet / selftest / framework.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Authors: Isaac Huang <isaac@clusterfs.com>
6  *            Liang Zhen  <liangzhen@clusterfs.com>
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include "selftest.h"
12
13 int brw_inject_errors = 0;
14 CFS_MODULE_PARM(brw_inject_errors, "i", int, 0644,
15                 "# data errors to inject randomly, zero by default");
16
17 static int session_timeout = 100;
18 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
19                 "test session timeout in seconds (100 by default, 0 == never)");
20
21 #define SFW_TEST_CONCURRENCY     128
22 #define SFW_TEST_RPC_TIMEOUT     64
23 #define SFW_CLIENT_RPC_TIMEOUT   64  /* in seconds */
24 #define SFW_EXTRA_TEST_BUFFERS   8 /* tolerate buggy peers with extra buffers */
25
26 #define sfw_test_buffers(tsi)    ((tsi)->tsi_loop + SFW_EXTRA_TEST_BUFFERS)
27
28 #define sfw_unpack_id(id)               \
29 do {                                    \
30         __swab64s(&(id).nid);           \
31         __swab32s(&(id).pid);           \
32 } while (0)
33
34 #define sfw_unpack_sid(sid)             \
35 do {                                    \
36         __swab64s(&(sid).ses_nid);      \
37         __swab64s(&(sid).ses_stamp);    \
38 } while (0)
39
40 #define sfw_unpack_fw_counters(fc)        \
41 do {                                      \
42         __swab32s(&(fc).brw_errors);      \
43         __swab32s(&(fc).ping_errors);     \
44         __swab32s(&(fc).active_tests);    \
45         __swab32s(&(fc).active_batches);  \
46         __swab32s(&(fc).zombie_sessions); \
47 } while (0)
48
49 #define sfw_unpack_rpc_counters(rc)     \
50 do {                                    \
51         __swab32s(&(rc).errors);        \
52         __swab32s(&(rc).rpcs_sent);     \
53         __swab32s(&(rc).rpcs_rcvd);     \
54         __swab32s(&(rc).rpcs_dropped);  \
55         __swab32s(&(rc).rpcs_expired);  \
56         __swab64s(&(rc).bulk_get);      \
57         __swab64s(&(rc).bulk_put);      \
58 } while (0)
59
60 #define sfw_unpack_lnet_counters(lc)    \
61 do {                                    \
62         __swab32s(&(lc).errors);        \
63         __swab32s(&(lc).msgs_max);      \
64         __swab32s(&(lc).msgs_alloc);    \
65         __swab32s(&(lc).send_count);    \
66         __swab32s(&(lc).recv_count);    \
67         __swab32s(&(lc).drop_count);    \
68         __swab32s(&(lc).route_count);   \
69         __swab64s(&(lc).send_length);   \
70         __swab64s(&(lc).recv_length);   \
71         __swab64s(&(lc).drop_length);   \
72         __swab64s(&(lc).route_length);  \
73 } while (0)
74
75 #define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive) != 0)
76 #define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive) != 0)
77
78 struct smoketest_framework {
79         struct list_head   fw_zombie_rpcs;     /* RPCs to be recycled */
80         struct list_head   fw_zombie_sessions; /* stopping sessions */
81         struct list_head   fw_tests;           /* registered test cases */
82         atomic_t           fw_nzombies;        /* # zombie sessions */
83         spinlock_t         fw_lock;            /* serialise */
84         sfw_session_t     *fw_session;         /* _the_ session */
85         int                fw_shuttingdown;    /* shutdown in progress */
86         srpc_server_rpc_t *fw_active_srpc;     /* running RPC */
87 } sfw_data;
88
89 /* forward ref's */
90 int sfw_stop_batch (sfw_batch_t *tsb, int force);
91 void sfw_destroy_session (sfw_session_t *sn);
92
93 static inline sfw_test_case_t *
94 sfw_find_test_case(int id)
95 {
96         sfw_test_case_t *tsc;
97
98         LASSERT (id <= SRPC_SERVICE_MAX_ID);
99         LASSERT (id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
100
101         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
102                 if (tsc->tsc_srv_service->sv_id == id)
103                         return tsc;
104         }
105
106         return NULL;
107 }
108
109 static int
110 sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops)
111 {
112         sfw_test_case_t *tsc;
113
114         if (sfw_find_test_case(service->sv_id) != NULL) {
115                 CERROR ("Failed to register test %s (%d)\n",
116                         service->sv_name, service->sv_id);
117                 return -EEXIST;
118         }
119
120         LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
121         if (tsc == NULL)
122                 return -ENOMEM;
123
124         memset(tsc, 0, sizeof(sfw_test_case_t));
125         tsc->tsc_cli_ops     = cliops;
126         tsc->tsc_srv_service = service;
127
128         list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
129         return 0;
130 }
131
132 void
133 sfw_add_session_timer (void)
134 {
135         sfw_session_t *sn = sfw_data.fw_session;
136         stt_timer_t   *timer = &sn->sn_timer;
137
138         LASSERT (!sfw_data.fw_shuttingdown);
139
140         if (sn == NULL || sn->sn_timeout == 0)
141                 return;
142
143         LASSERT (!sn->sn_timer_active);
144
145         sn->sn_timer_active = 1;
146         timer->stt_expires = cfs_time_add(sn->sn_timeout,
147                                           cfs_time_current_sec());
148         stt_add_timer(timer);
149         return;
150 }
151
152 int
153 sfw_del_session_timer (void)
154 {
155         sfw_session_t *sn = sfw_data.fw_session;
156
157         if (sn == NULL || !sn->sn_timer_active)
158                 return 0;
159
160         LASSERT (sn->sn_timeout != 0);
161
162         if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
163                 sn->sn_timer_active = 0;
164                 return 0;
165         }
166
167 #ifndef __KERNEL__
168         /* Racing is impossible in single-threaded userland selftest */
169         LBUG();
170 #endif
171         return EBUSY; /* racing with sfw_session_expired() */
172 }
173
174 /* called with sfw_data.fw_lock held */
175 static void
176 sfw_deactivate_session (void)
177 {
178         sfw_session_t *sn = sfw_data.fw_session;
179         int            nactive = 0;
180         sfw_batch_t   *tsb;
181
182         if (sn == NULL) return;
183
184         LASSERT (!sn->sn_timer_active);
185
186         sfw_data.fw_session = NULL;
187         atomic_inc(&sfw_data.fw_nzombies);
188         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
189
190         list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
191                 if (sfw_batch_active(tsb)) {
192                         nactive++;
193                         sfw_stop_batch(tsb, 1);
194                 }
195         }
196
197         if (nactive != 0)
198                 return;   /* wait for active batches to stop */
199
200         list_del_init(&sn->sn_list);
201         spin_unlock(&sfw_data.fw_lock);
202
203         sfw_destroy_session(sn);
204
205         spin_lock(&sfw_data.fw_lock);
206         return;
207 }
208
209 #ifndef __KERNEL__
210
211 int
212 sfw_session_removed (void)
213 {
214         return (sfw_data.fw_session == NULL) ? 1 : 0;
215 }
216
217 #endif
218
219 void
220 sfw_session_expired (void *data)
221 {
222         sfw_session_t *sn = data;
223
224         spin_lock(&sfw_data.fw_lock);
225
226         LASSERT (sn->sn_timer_active);
227         LASSERT (sn == sfw_data.fw_session);
228
229         CWARN ("Session expired! sid: %s-"LPU64", name: %s\n",
230                libcfs_nid2str(sn->sn_id.ses_nid),
231                sn->sn_id.ses_stamp, &sn->sn_name[0]);
232
233         sn->sn_timer_active = 0;
234         sfw_deactivate_session();
235
236         spin_unlock(&sfw_data.fw_lock);
237         return;
238 }
239
240 static inline void
241 sfw_init_session (sfw_session_t *sn, lst_sid_t sid, const char *name)
242 {
243         stt_timer_t *timer = &sn->sn_timer;
244
245         memset(sn, 0, sizeof(sfw_session_t));
246         CFS_INIT_LIST_HEAD(&sn->sn_list);
247         CFS_INIT_LIST_HEAD(&sn->sn_batches);
248         atomic_set(&sn->sn_brw_errors, 0);
249         atomic_set(&sn->sn_ping_errors, 0);
250         strncpy(&sn->sn_name[0], name, LST_NAME_SIZE);
251
252         sn->sn_timer_active = 0;
253         sn->sn_id           = sid;
254         sn->sn_timeout      = session_timeout;
255
256         timer->stt_data = sn;
257         timer->stt_func = sfw_session_expired;
258         CFS_INIT_LIST_HEAD(&timer->stt_list);
259 }
260
261 /* completion handler for incoming framework RPCs */
262 void
263 sfw_server_rpc_done (srpc_server_rpc_t *rpc)
264 {
265         srpc_service_t *sv = rpc->srpc_service;
266         int             status = rpc->srpc_status;
267
268         CDEBUG (D_NET,
269                 "Incoming framework RPC done: "
270                 "service %s, peer %s, status %s:%d\n",
271                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
272                 swi_state2str(rpc->srpc_wi.wi_state),
273                 status);
274
275         if (rpc->srpc_bulk != NULL)
276                 sfw_free_pages(rpc);
277         return;
278 }
279
280 void
281 sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
282 {
283         LASSERT (rpc->crpc_bulk.bk_niov == 0);
284         LASSERT (list_empty(&rpc->crpc_list));
285         LASSERT (atomic_read(&rpc->crpc_refcount) == 0);
286 #ifndef __KERNEL__
287         LASSERT (rpc->crpc_bulk.bk_pages == NULL);
288 #endif
289
290         CDEBUG (D_NET,
291                 "Outgoing framework RPC done: "
292                 "service %d, peer %s, status %s:%d:%d\n",
293                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
294                 swi_state2str(rpc->crpc_wi.wi_state),
295                 rpc->crpc_aborted, rpc->crpc_status);
296
297         spin_lock(&sfw_data.fw_lock);
298
299         /* my callers must finish all RPCs before shutting me down */
300         LASSERT (!sfw_data.fw_shuttingdown);
301         list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
302
303         spin_unlock(&sfw_data.fw_lock);
304         return;
305 }
306
307 sfw_batch_t *
308 sfw_find_batch (lst_bid_t bid)
309 {
310         sfw_session_t *sn = sfw_data.fw_session;
311         sfw_batch_t   *bat;
312
313         LASSERT (sn != NULL);
314
315         list_for_each_entry (bat, &sn->sn_batches, bat_list) {
316                 if (bat->bat_id.bat_id == bid.bat_id)
317                         return bat;
318         }
319
320         return NULL;
321 }
322
323 sfw_batch_t *
324 sfw_bid2batch (lst_bid_t bid)
325 {
326         sfw_session_t *sn = sfw_data.fw_session;
327         sfw_batch_t   *bat;
328
329         LASSERT (sn != NULL);
330
331         bat = sfw_find_batch(bid);
332         if (bat != NULL)
333                 return bat;
334
335         LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
336         if (bat == NULL) 
337                 return NULL;
338
339         bat->bat_error    = 0;
340         bat->bat_session  = sn;
341         bat->bat_id       = bid;
342         atomic_set(&bat->bat_nactive, 0);
343         CFS_INIT_LIST_HEAD(&bat->bat_tests);
344
345         list_add_tail(&bat->bat_list, &sn->sn_batches);
346         return bat;
347 }
348
349 int
350 sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
351 {
352         sfw_session_t  *sn = sfw_data.fw_session;
353         sfw_counters_t *cnt = &reply->str_fw;
354         sfw_batch_t    *bat;
355
356         reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
357
358         if (request->str_sid.ses_nid == LNET_NID_ANY) {
359                 reply->str_status = EINVAL;
360                 return 0;
361         }
362
363         if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
364                 reply->str_status = ESRCH;
365                 return 0;
366         }
367
368         LNET_LOCK();
369         reply->str_lnet = the_lnet.ln_counters;
370         LNET_UNLOCK();
371
372         srpc_get_counters(&reply->str_rpc);
373
374         cnt->brw_errors      = atomic_read(&sn->sn_brw_errors);
375         cnt->ping_errors     = atomic_read(&sn->sn_ping_errors);
376         cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
377
378         cnt->active_tests = cnt->active_batches = 0;
379         list_for_each_entry (bat, &sn->sn_batches, bat_list) {
380                 int n = atomic_read(&bat->bat_nactive);
381
382                 if (n > 0) {
383                         cnt->active_batches++;
384                         cnt->active_tests += n;
385                 }
386         }
387
388         reply->str_status = 0;
389         return 0;
390 }
391
392 int
393 sfw_make_session (srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
394 {
395         sfw_session_t *sn = sfw_data.fw_session;
396
397         if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
398                 reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
399                 reply->mksn_status = EINVAL;
400                 return 0;
401         }
402
403         if (sn != NULL && !request->mksn_force) {
404                 reply->mksn_sid    = sn->sn_id;
405                 reply->mksn_status = EBUSY;
406                 strncpy(&reply->mksn_name[0], &sn->sn_name[0], LST_NAME_SIZE);
407                 return 0;
408         }
409         
410         LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
411         if (sn == NULL) {
412                 CERROR ("Dropping RPC (mksn) under memory pressure.\n");
413                 return -ENOMEM;
414         }
415
416         sfw_init_session(sn, request->mksn_sid, &request->mksn_name[0]);
417
418         spin_lock(&sfw_data.fw_lock);
419
420         sfw_deactivate_session();
421         LASSERT (sfw_data.fw_session == NULL);
422         sfw_data.fw_session = sn;
423
424         spin_unlock(&sfw_data.fw_lock);
425
426         reply->mksn_status  = 0;
427         reply->mksn_sid     = sn->sn_id;
428         reply->mksn_timeout = sn->sn_timeout;
429         return 0;
430 }
431
432 int
433 sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
434 {
435         sfw_session_t *sn = sfw_data.fw_session;
436
437         reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
438
439         if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
440                 reply->rmsn_status = EINVAL;
441                 return 0;
442         }
443
444         if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
445                 reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
446                 return 0;
447         }
448
449         spin_lock(&sfw_data.fw_lock);
450         sfw_deactivate_session();
451         spin_unlock(&sfw_data.fw_lock);
452
453         reply->rmsn_status = 0;
454         reply->rmsn_sid    = LST_INVALID_SID;
455         LASSERT (sfw_data.fw_session == NULL);
456         return 0;
457 }
458
459 int
460 sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
461 {
462         sfw_session_t *sn = sfw_data.fw_session;
463
464         if (sn == NULL) {
465                 reply->dbg_status = ESRCH;
466                 reply->dbg_sid    = LST_INVALID_SID;
467                 return 0;
468         } 
469
470         reply->dbg_status  = 0;
471         reply->dbg_sid     = sn->sn_id;      
472         reply->dbg_timeout = sn->sn_timeout;
473         strncpy(reply->dbg_name, &sn->sn_name[0], LST_NAME_SIZE);
474
475         return 0;
476 }
477
478 void
479 sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
480 {
481         sfw_test_unit_t     *tsu = rpc->crpc_priv;
482         sfw_test_instance_t *tsi = tsu->tsu_instance;
483
484         /* Called with hold of tsi->tsi_lock */
485         LASSERT (list_empty(&rpc->crpc_list));
486         list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
487 }
488
489 int
490 sfw_load_test (sfw_test_instance_t *tsi)
491 {
492         sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
493         int              nrequired = sfw_test_buffers(tsi);
494         int              nposted;
495
496         LASSERT (tsc != NULL);
497
498         if (tsi->tsi_is_client) {
499                 tsi->tsi_ops = tsc->tsc_cli_ops;
500                 return 0;
501         }
502
503         nposted = srpc_service_add_buffers(tsc->tsc_srv_service, nrequired);
504         if (nposted != nrequired) {
505                 CWARN ("Failed to reserve enough buffers: "
506                        "service %s, %d needed, %d reserved\n",
507                        tsc->tsc_srv_service->sv_name, nrequired, nposted);
508                 srpc_service_remove_buffers(tsc->tsc_srv_service, nposted);
509                 return -ENOMEM;
510         }
511
512         CDEBUG (D_NET, "Reserved %d buffers for test %s\n",
513                 nposted, tsc->tsc_srv_service->sv_name);
514         return 0;
515 }
516
517 void
518 sfw_unload_test (sfw_test_instance_t *tsi)
519 {
520         sfw_test_case_t *tsc = sfw_find_test_case(tsi->tsi_service);
521
522         LASSERT (tsc != NULL);
523
524         if (!tsi->tsi_is_client)
525                 srpc_service_remove_buffers(tsc->tsc_srv_service,
526                                             sfw_test_buffers(tsi));
527         return;
528 }
529
530 void
531 sfw_destroy_test_instance (sfw_test_instance_t *tsi)
532 {
533         srpc_client_rpc_t *rpc;
534         sfw_test_unit_t   *tsu;
535
536         if (!tsi->tsi_is_client) goto clean;
537
538         tsi->tsi_ops->tso_fini(tsi);
539
540         LASSERT (!tsi->tsi_stopping);
541         LASSERT (list_empty(&tsi->tsi_active_rpcs));
542         LASSERT (!sfw_test_active(tsi));
543
544         while (!list_empty(&tsi->tsi_units)) {
545                 tsu = list_entry(tsi->tsi_units.next,
546                                  sfw_test_unit_t, tsu_list);
547                 list_del(&tsu->tsu_list);
548                 LIBCFS_FREE(tsu, sizeof(*tsu));
549         }
550
551         while (!list_empty(&tsi->tsi_free_rpcs)) {
552                 rpc = list_entry(tsi->tsi_free_rpcs.next,
553                                  srpc_client_rpc_t, crpc_list);
554                 list_del(&rpc->crpc_list);
555                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
556         }
557
558 clean:
559         sfw_unload_test(tsi);
560         LIBCFS_FREE(tsi, sizeof(*tsi));
561         return;
562 }
563
564 void
565 sfw_destroy_batch (sfw_batch_t *tsb)
566 {
567         sfw_test_instance_t *tsi;
568
569         LASSERT (!sfw_batch_active(tsb));
570         LASSERT (list_empty(&tsb->bat_list));
571
572         while (!list_empty(&tsb->bat_tests)) {
573                 tsi = list_entry(tsb->bat_tests.next,
574                                  sfw_test_instance_t, tsi_list);
575                 list_del_init(&tsi->tsi_list);
576                 sfw_destroy_test_instance(tsi);
577         }
578
579         LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
580         return;
581 }
582
583 void
584 sfw_destroy_session (sfw_session_t *sn)
585 {
586         sfw_batch_t *batch;
587
588         LASSERT (list_empty(&sn->sn_list));
589         LASSERT (sn != sfw_data.fw_session);
590
591         while (!list_empty(&sn->sn_batches)) {
592                 batch = list_entry(sn->sn_batches.next,
593                                    sfw_batch_t, bat_list);
594                 list_del_init(&batch->bat_list);
595                 sfw_destroy_batch(batch);
596         }
597
598         LIBCFS_FREE(sn, sizeof(*sn));
599         atomic_dec(&sfw_data.fw_nzombies);
600         return;
601 }
602
603 void
604 sfw_unpack_test_req (srpc_msg_t *msg)
605 {
606         srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
607
608         LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST);
609         LASSERT (req->tsr_is_client);
610
611         if (msg->msg_magic == SRPC_MSG_MAGIC)
612                 return; /* no flipping needed */
613
614         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
615
616         if (req->tsr_service == SRPC_SERVICE_BRW) {
617                 test_bulk_req_t *bulk = &req->tsr_u.bulk;
618
619                 __swab32s(&bulk->blk_opc);
620                 __swab32s(&bulk->blk_npg);
621                 __swab32s(&bulk->blk_flags);
622                 return;
623         }
624
625         if (req->tsr_service == SRPC_SERVICE_PING) {
626                 test_ping_req_t *ping = &req->tsr_u.ping;
627
628                 __swab32s(&ping->png_size);
629                 __swab32s(&ping->png_flags);
630                 return;
631         }
632
633         LBUG ();
634         return;
635 }
636
637 int
638 sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
639 {
640         srpc_msg_t          *msg = &rpc->srpc_reqstbuf->buf_msg;
641         srpc_test_reqst_t   *req = &msg->msg_body.tes_reqst;
642         srpc_bulk_t         *bk = rpc->srpc_bulk;
643         int                  ndest = req->tsr_ndest;
644         sfw_test_unit_t     *tsu;
645         sfw_test_instance_t *tsi;
646         int                  i;
647         int                  rc;
648
649         LIBCFS_ALLOC(tsi, sizeof(*tsi));
650         if (tsi == NULL) {
651                 CERROR ("Can't allocate test instance for batch: "LPU64"\n",
652                         tsb->bat_id.bat_id);
653                 return -ENOMEM;
654         }
655
656         memset(tsi, 0, sizeof(*tsi));
657         spin_lock_init(&tsi->tsi_lock);
658         atomic_set(&tsi->tsi_nactive, 0);
659         CFS_INIT_LIST_HEAD(&tsi->tsi_units);
660         CFS_INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
661         CFS_INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
662
663         tsi->tsi_stopping      = 0;
664         tsi->tsi_batch         = tsb;
665         tsi->tsi_loop          = req->tsr_loop;
666         tsi->tsi_concur        = req->tsr_concur;
667         tsi->tsi_service       = req->tsr_service;
668         tsi->tsi_is_client     = !!(req->tsr_is_client);
669         tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
670
671         rc = sfw_load_test(tsi);
672         if (rc != 0) {
673                 LIBCFS_FREE(tsi, sizeof(*tsi));
674                 return rc;
675         }
676
677         LASSERT (!sfw_batch_active(tsb));
678
679         if (!tsi->tsi_is_client) {
680                 /* it's test server, just add it to tsb */
681                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
682                 return 0;
683         }
684
685         LASSERT (bk != NULL);
686 #ifndef __KERNEL__
687         LASSERT (bk->bk_pages != NULL);
688 #endif
689         LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= ndest);
690         LASSERT (bk->bk_len >= sizeof(lnet_process_id_t) * ndest);
691
692         sfw_unpack_test_req(msg);
693         memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
694
695         for (i = 0; i < ndest; i++) {
696                 lnet_process_id_t *dests;
697                 lnet_process_id_t  id;
698                 int                j;
699
700 #ifdef __KERNEL__
701                 dests = cfs_page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
702                 LASSERT (dests != NULL);  /* my pages are within KVM always */
703 #else
704                 dests = cfs_page_address(bk->bk_pages[i / SFW_ID_PER_PAGE]);
705 #endif
706                 id = dests[i % SFW_ID_PER_PAGE];
707                 if (msg->msg_magic != SRPC_MSG_MAGIC)
708                         sfw_unpack_id(id);
709
710                 for (j = 0; j < tsi->tsi_concur; j++) {
711                         LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
712                         if (tsu == NULL) {
713                                 rc = -ENOMEM;
714                                 CERROR ("Can't allocate tsu for %d\n",
715                                         tsi->tsi_service);
716                                 goto error;
717                         }
718
719                         tsu->tsu_dest     = id;
720                         tsu->tsu_instance = tsi;
721                         tsu->tsu_private  = NULL;
722                         list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
723                 }
724         }
725
726         rc = tsi->tsi_ops->tso_init(tsi);
727         if (rc == 0) {
728                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
729                 return 0;
730         }
731
732 error:
733         LASSERT (rc != 0);
734         sfw_destroy_test_instance(tsi);
735         return rc;
736 }
737
738 static void
739 sfw_test_unit_done (sfw_test_unit_t *tsu)
740 {
741         sfw_test_instance_t *tsi = tsu->tsu_instance;
742         sfw_batch_t         *tsb = tsi->tsi_batch;
743         sfw_session_t       *sn = tsb->bat_session;
744
745         LASSERT (sfw_test_active(tsi));
746
747         if (!atomic_dec_and_test(&tsi->tsi_nactive))
748                 return;
749         
750         /* the test instance is done */
751         spin_lock(&tsi->tsi_lock);
752
753         tsi->tsi_stopping = 0;
754
755         spin_unlock(&tsi->tsi_lock);
756
757         spin_lock(&sfw_data.fw_lock);
758
759         if (!atomic_dec_and_test(&tsb->bat_nactive) || /* tsb still active */
760             sn == sfw_data.fw_session) {               /* sn also active */
761                 spin_unlock(&sfw_data.fw_lock);
762                 return;
763         }
764         
765         LASSERT (!list_empty(&sn->sn_list)); /* I'm a zombie! */
766
767         list_for_each_entry (tsb, &sn->sn_batches, bat_list) {
768                 if (sfw_batch_active(tsb)) {
769                         spin_unlock(&sfw_data.fw_lock);
770                         return;
771                 }
772         }
773
774         list_del_init(&sn->sn_list);
775         spin_unlock(&sfw_data.fw_lock);
776
777         sfw_destroy_session(sn);
778         return;
779 }
780
781 void
782 sfw_test_rpc_done (srpc_client_rpc_t *rpc)
783 {
784         sfw_test_unit_t     *tsu = rpc->crpc_priv;
785         sfw_test_instance_t *tsi = tsu->tsu_instance;
786         int                  done = 0;
787
788         tsi->tsi_ops->tso_done_rpc(tsu, rpc);
789                       
790         spin_lock(&tsi->tsi_lock);
791
792         LASSERT (sfw_test_active(tsi));
793         LASSERT (!list_empty(&rpc->crpc_list));
794
795         list_del_init(&rpc->crpc_list);
796
797         /* batch is stopping or loop is done or get error */
798         if (tsi->tsi_stopping ||
799             tsu->tsu_loop == 0 ||
800             (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr))
801                 done = 1;
802
803         /* dec ref for poster */
804         srpc_client_rpc_decref(rpc);
805
806         spin_unlock(&tsi->tsi_lock);
807
808         if (!done) {
809                 swi_schedule_workitem(&tsu->tsu_worker);
810                 return;
811         }
812
813         sfw_test_unit_done(tsu);
814         return;
815 }
816
817 int
818 sfw_create_test_rpc (sfw_test_unit_t *tsu, lnet_process_id_t peer,
819                      int nblk, int blklen, srpc_client_rpc_t **rpcpp)
820 {
821         srpc_client_rpc_t   *rpc = NULL;
822         sfw_test_instance_t *tsi = tsu->tsu_instance;
823         
824         spin_lock(&tsi->tsi_lock);
825
826         LASSERT (sfw_test_active(tsi));
827
828         if (!list_empty(&tsi->tsi_free_rpcs)) {
829                 /* pick request from buffer */
830                 rpc = list_entry(tsi->tsi_free_rpcs.next,
831                                  srpc_client_rpc_t, crpc_list);
832                 LASSERT (nblk == rpc->crpc_bulk.bk_niov);
833                 list_del_init(&rpc->crpc_list);
834
835                 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
836                                      blklen, sfw_test_rpc_done,
837                                      sfw_test_rpc_fini, tsu);
838         }
839
840         spin_unlock(&tsi->tsi_lock);
841         
842         if (rpc == NULL)
843                 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
844                                              blklen, sfw_test_rpc_done, 
845                                              sfw_test_rpc_fini, tsu);
846         if (rpc == NULL) {
847                 CERROR ("Can't create rpc for test %d\n", tsi->tsi_service);
848                 return -ENOMEM;
849         }
850
851         *rpcpp = rpc;
852         return 0;
853 }
854
855 int
856 sfw_run_test (swi_workitem_t *wi)
857 {
858         sfw_test_unit_t     *tsu = wi->wi_data;
859         sfw_test_instance_t *tsi = tsu->tsu_instance;
860         srpc_client_rpc_t   *rpc = NULL;
861
862         LASSERT (wi == &tsu->tsu_worker);
863
864         if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
865                 LASSERT (rpc == NULL);
866                 goto test_done;
867         }
868
869         LASSERT (rpc != NULL);
870
871         spin_lock(&tsi->tsi_lock);
872
873         if (tsi->tsi_stopping) {
874                 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
875                 spin_unlock(&tsi->tsi_lock);
876                 goto test_done;
877         }
878
879         if (tsu->tsu_loop > 0)
880                 tsu->tsu_loop--;
881
882         list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
883         spin_unlock(&tsi->tsi_lock);
884
885         rpc->crpc_timeout = SFW_TEST_RPC_TIMEOUT;
886
887         spin_lock(&rpc->crpc_lock);
888         srpc_post_rpc(rpc);
889         spin_unlock(&rpc->crpc_lock);
890         return 0;
891
892 test_done:
893         /*
894          * No one can schedule me now since:
895          * - previous RPC, if any, has done and
896          * - no new RPC is initiated.
897          * - my batch is still active; no one can run it again now.
898          * Cancel pending schedules and prevent future schedule attempts:
899          */
900         swi_kill_workitem(wi);
901         sfw_test_unit_done(tsu);
902         return 1;
903 }
904
905 int
906 sfw_run_batch (sfw_batch_t *tsb)
907 {
908         swi_workitem_t      *wi;
909         sfw_test_unit_t     *tsu;
910         sfw_test_instance_t *tsi;
911
912         if (sfw_batch_active(tsb)) {
913                 CDEBUG (D_NET, "Can't start active batch: "LPU64" (%d)\n",
914                         tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
915                 return -EPERM;
916         }
917
918         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
919                 if (!tsi->tsi_is_client) /* skip server instances */
920                         continue;
921
922                 LASSERT (!tsi->tsi_stopping);
923                 LASSERT (!sfw_test_active(tsi));
924
925                 atomic_inc(&tsb->bat_nactive);
926
927                 list_for_each_entry (tsu, &tsi->tsi_units, tsu_list) {
928                         atomic_inc(&tsi->tsi_nactive);
929                         tsu->tsu_loop = tsi->tsi_loop;
930                         wi = &tsu->tsu_worker;
931                         swi_init_workitem(wi, tsu, sfw_run_test);
932                         swi_schedule_workitem(wi);
933                 }
934         }
935
936         return 0;
937 }
938
939 int
940 sfw_stop_batch (sfw_batch_t *tsb, int force)
941 {
942         sfw_test_instance_t *tsi;
943         srpc_client_rpc_t   *rpc;
944
945         if (!sfw_batch_active(tsb))
946                 return -EPERM;
947
948         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
949                 spin_lock(&tsi->tsi_lock);
950
951                 if (!tsi->tsi_is_client ||
952                     !sfw_test_active(tsi) || tsi->tsi_stopping) {
953                         spin_unlock(&tsi->tsi_lock);
954                         continue;
955                 }
956
957                 tsi->tsi_stopping = 1;
958
959                 if (!force) {
960                         spin_unlock(&tsi->tsi_lock);
961                         continue;
962                 }
963
964                 /* abort launched rpcs in the test */
965                 list_for_each_entry (rpc, &tsi->tsi_active_rpcs, crpc_list) {
966                         spin_lock(&rpc->crpc_lock);
967
968                         srpc_abort_rpc(rpc, -EINTR);
969
970                         spin_unlock(&rpc->crpc_lock);
971                 }
972
973                 spin_unlock(&tsi->tsi_lock);
974         }
975
976         return 0;
977 }
978
979 int
980 sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
981 {
982         sfw_test_instance_t *tsi;
983
984         if (testidx < 0)
985                 return -EINVAL;
986
987         if (testidx == 0) {
988                 reply->bar_active = atomic_read(&tsb->bat_nactive);
989                 return 0;
990         }
991
992         list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) {
993                 if (testidx-- > 1)
994                         continue;
995
996                 reply->bar_active = atomic_read(&tsi->tsi_nactive);
997                 return 0;
998         }
999
1000         return -ENOENT;
1001 }
1002
1003 void
1004 sfw_free_pages (srpc_server_rpc_t *rpc)
1005 {
1006         srpc_free_bulk(rpc->srpc_bulk);
1007         rpc->srpc_bulk = NULL;
1008 }
1009
1010 int
1011 sfw_alloc_pages (srpc_server_rpc_t *rpc, int npages, int sink)
1012 {
1013         LASSERT (rpc->srpc_bulk == NULL);
1014         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
1015
1016         rpc->srpc_bulk = srpc_alloc_bulk(npages, sink);
1017         if (rpc->srpc_bulk == NULL) return -ENOMEM;
1018
1019         return 0;
1020 }
1021
1022 int
1023 sfw_add_test (srpc_server_rpc_t *rpc)
1024 {
1025         sfw_session_t     *sn = sfw_data.fw_session;
1026         srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1027         srpc_test_reqst_t *request;
1028         int                rc;
1029         sfw_batch_t       *bat;
1030
1031         request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1032         reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1033
1034         if (request->tsr_loop == 0 ||
1035             request->tsr_concur == 0 ||
1036             request->tsr_sid.ses_nid == LNET_NID_ANY ||
1037             request->tsr_ndest > SFW_MAX_NDESTS ||
1038             (request->tsr_is_client && request->tsr_ndest == 0) ||
1039             request->tsr_concur > SFW_MAX_CONCUR ||
1040             request->tsr_service > SRPC_SERVICE_MAX_ID ||
1041             request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1042                 reply->tsr_status = EINVAL;
1043                 return 0;
1044         }
1045
1046         if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1047             sfw_find_test_case(request->tsr_service) == NULL) {
1048                 reply->tsr_status = ENOENT;
1049                 return 0;
1050         }
1051
1052         bat = sfw_bid2batch(request->tsr_bid);
1053         if (bat == NULL) {
1054                 CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
1055                         rpc->srpc_service->sv_name,
1056                         libcfs_id2str(rpc->srpc_peer));
1057                 return -ENOMEM;
1058         }
1059
1060         if (sfw_batch_active(bat)) {
1061                 reply->tsr_status = EBUSY;
1062                 return 0;
1063         }
1064
1065         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1066                 /* rpc will be resumed later in sfw_bulk_ready */
1067                 return sfw_alloc_pages(rpc,
1068                                        sfw_id_pages(request->tsr_ndest), 1);
1069         }
1070
1071         rc = sfw_add_test_instance(bat, rpc);
1072         CDEBUG (rc == 0 ? D_NET : D_WARNING,
1073                 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1074                 rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1075                 request->tsr_is_client ? "client" : "server",
1076                 request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1077
1078         reply->tsr_status = (rc < 0) ? -rc : rc;
1079         return 0;
1080 }
1081
1082 int
1083 sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1084 {
1085         sfw_session_t *sn = sfw_data.fw_session;
1086         int            rc = 0;
1087         sfw_batch_t   *bat;
1088
1089         reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1090
1091         if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1092                 reply->bar_status = ESRCH;
1093                 return 0;
1094         }
1095
1096         bat = sfw_find_batch(request->bar_bid);
1097         if (bat == NULL) {
1098                 reply->bar_status = ENOENT;
1099                 return 0;
1100         }
1101
1102         switch (request->bar_opc) {
1103         case SRPC_BATCH_OPC_RUN:
1104                 rc = sfw_run_batch(bat);
1105                 break;
1106
1107         case SRPC_BATCH_OPC_STOP:
1108                 rc = sfw_stop_batch(bat, request->bar_arg);
1109                 break;
1110
1111         case SRPC_BATCH_OPC_QUERY:
1112                 rc = sfw_query_batch(bat, request->bar_testidx, reply);
1113                 break;
1114
1115         default:
1116                 return -EINVAL; /* drop it */
1117         }
1118
1119         reply->bar_status = (rc < 0) ? -rc : rc;
1120         return 0;
1121 }
1122
1123 int
1124 sfw_handle_server_rpc (srpc_server_rpc_t *rpc)
1125 {
1126         srpc_service_t *sv = rpc->srpc_service;
1127         srpc_msg_t     *reply = &rpc->srpc_replymsg;
1128         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
1129         int             rc = 0;
1130
1131         LASSERT (sfw_data.fw_active_srpc == NULL);
1132         LASSERT (sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1133
1134         spin_lock(&sfw_data.fw_lock);
1135
1136         if (sfw_data.fw_shuttingdown) {
1137                 spin_unlock(&sfw_data.fw_lock);
1138                 return -ESHUTDOWN;
1139         }
1140
1141         /* Remove timer to avoid racing with it or expiring active session */
1142         if (sfw_del_session_timer() != 0) {
1143                 CERROR ("Dropping RPC (%s) from %s: racing with expiry timer.",
1144                         sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1145                 spin_unlock(&sfw_data.fw_lock);
1146                 return -EAGAIN;
1147         }
1148
1149         sfw_data.fw_active_srpc = rpc;
1150         spin_unlock(&sfw_data.fw_lock);
1151
1152         sfw_unpack_message(request);
1153         LASSERT (request->msg_type == srpc_service2request(sv->sv_id));
1154
1155         switch(sv->sv_id) {
1156         default:
1157                 LBUG ();
1158         case SRPC_SERVICE_TEST:
1159                 rc = sfw_add_test(rpc);
1160                 break;
1161
1162         case SRPC_SERVICE_BATCH:
1163                 rc = sfw_control_batch(&request->msg_body.bat_reqst,
1164                                        &reply->msg_body.bat_reply);
1165                 break;
1166
1167         case SRPC_SERVICE_QUERY_STAT:
1168                 rc = sfw_get_stats(&request->msg_body.stat_reqst,
1169                                    &reply->msg_body.stat_reply);
1170                 break;
1171
1172         case SRPC_SERVICE_DEBUG:
1173                 rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1174                                        &reply->msg_body.dbg_reply);
1175                 break;
1176
1177         case SRPC_SERVICE_MAKE_SESSION:
1178                 rc = sfw_make_session(&request->msg_body.mksn_reqst,
1179                                       &reply->msg_body.mksn_reply);
1180                 break;
1181
1182         case SRPC_SERVICE_REMOVE_SESSION:
1183                 rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1184                                         &reply->msg_body.rmsn_reply);
1185                 break;
1186         }
1187
1188         rpc->srpc_done = sfw_server_rpc_done;
1189         spin_lock(&sfw_data.fw_lock);
1190
1191 #ifdef __KERNEL__
1192         if (!sfw_data.fw_shuttingdown)
1193                 sfw_add_session_timer();
1194 #else
1195         LASSERT (!sfw_data.fw_shuttingdown);
1196         sfw_add_session_timer();
1197 #endif
1198
1199         sfw_data.fw_active_srpc = NULL;
1200         spin_unlock(&sfw_data.fw_lock);
1201         return rc;
1202 }
1203
1204 int
1205 sfw_bulk_ready (srpc_server_rpc_t *rpc, int status)
1206 {
1207         srpc_service_t *sv = rpc->srpc_service;
1208         int             rc;
1209
1210         LASSERT (rpc->srpc_bulk != NULL);
1211         LASSERT (sv->sv_id == SRPC_SERVICE_TEST);
1212         LASSERT (sfw_data.fw_active_srpc == NULL);
1213         LASSERT (rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1214
1215         spin_lock(&sfw_data.fw_lock);
1216
1217         if (status != 0) {
1218                 CERROR ("Bulk transfer failed for RPC: "
1219                         "service %s, peer %s, status %d\n",
1220                         sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1221                 spin_unlock(&sfw_data.fw_lock);
1222                 return -EIO;
1223         }
1224
1225         if (sfw_data.fw_shuttingdown) {
1226                 spin_unlock(&sfw_data.fw_lock);
1227                 return -ESHUTDOWN;
1228         }
1229
1230         if (sfw_del_session_timer() != 0) {
1231                 CERROR ("Dropping RPC (%s) from %s: racing with expiry timer",
1232                         sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1233                 spin_unlock(&sfw_data.fw_lock);
1234                 return -EAGAIN;
1235         }
1236
1237         sfw_data.fw_active_srpc = rpc;
1238         spin_unlock(&sfw_data.fw_lock);
1239
1240         rc = sfw_add_test(rpc);
1241
1242         spin_lock(&sfw_data.fw_lock);
1243
1244 #ifdef __KERNEL__
1245         if (!sfw_data.fw_shuttingdown)
1246                 sfw_add_session_timer();
1247 #else
1248         LASSERT (!sfw_data.fw_shuttingdown);
1249         sfw_add_session_timer();
1250 #endif
1251
1252         sfw_data.fw_active_srpc = NULL;
1253         spin_unlock(&sfw_data.fw_lock);
1254         return rc;
1255 }
1256
1257 srpc_client_rpc_t *
1258 sfw_create_rpc (lnet_process_id_t peer, int service,
1259                 int nbulkiov, int bulklen,
1260                 void (*done) (srpc_client_rpc_t *), void *priv)
1261 {
1262         srpc_client_rpc_t *rpc;
1263
1264         spin_lock(&sfw_data.fw_lock);
1265
1266         LASSERT (!sfw_data.fw_shuttingdown);
1267         LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1268
1269         if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1270                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1271                                  srpc_client_rpc_t, crpc_list);
1272                 list_del(&rpc->crpc_list);
1273                 spin_unlock(&sfw_data.fw_lock);
1274
1275                 srpc_init_client_rpc(rpc, peer, service, 0, 0,
1276                                      done, sfw_client_rpc_fini, priv);
1277                 return rpc;
1278         }
1279
1280         spin_unlock(&sfw_data.fw_lock);
1281
1282         rpc = srpc_create_client_rpc(peer, service, nbulkiov, bulklen, done,
1283                                      nbulkiov != 0 ? NULL : sfw_client_rpc_fini,
1284                                      priv);
1285         return rpc;
1286 }
1287
1288 void
1289 sfw_unpack_message (srpc_msg_t *msg)
1290 {
1291         if (msg->msg_magic == SRPC_MSG_MAGIC)
1292                 return; /* no flipping needed */
1293
1294         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1295
1296         __swab32s(&msg->msg_type);
1297
1298         if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1299                 srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1300
1301                 __swab32s(&req->str_type);
1302                 __swab64s(&req->str_rpyid);
1303                 sfw_unpack_sid(req->str_sid);
1304                 return;
1305         }
1306
1307         if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1308                 srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1309
1310                 __swab32s(&rep->str_status);
1311                 sfw_unpack_sid(rep->str_sid);
1312                 sfw_unpack_fw_counters(rep->str_fw);
1313                 sfw_unpack_rpc_counters(rep->str_rpc);
1314                 sfw_unpack_lnet_counters(rep->str_lnet);
1315                 return;
1316         }
1317
1318         if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1319                 srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1320
1321                 __swab64s(&req->mksn_rpyid);
1322                 __swab32s(&req->mksn_force);
1323                 sfw_unpack_sid(req->mksn_sid);
1324                 return;
1325         }
1326
1327         if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1328                 srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1329
1330                 __swab32s(&rep->mksn_status);
1331                 __swab32s(&rep->mksn_timeout);
1332                 sfw_unpack_sid(rep->mksn_sid);
1333                 return;
1334         }
1335
1336         if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1337                 srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1338
1339                 __swab64s(&req->rmsn_rpyid);
1340                 sfw_unpack_sid(req->rmsn_sid);
1341                 return;
1342         }
1343
1344         if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1345                 srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1346
1347                 __swab32s(&rep->rmsn_status);
1348                 sfw_unpack_sid(rep->rmsn_sid);
1349                 return;
1350         }
1351
1352         if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1353                 srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1354
1355                 __swab64s(&req->dbg_rpyid);
1356                 __swab32s(&req->dbg_flags);
1357                 sfw_unpack_sid(req->dbg_sid);
1358                 return;
1359         }
1360
1361         if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1362                 srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1363
1364                 __swab32s(&rep->dbg_nbatch);
1365                 __swab32s(&rep->dbg_timeout);
1366                 sfw_unpack_sid(rep->dbg_sid);
1367                 return;
1368         }
1369
1370         if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1371                 srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1372
1373                 __swab32s(&req->bar_opc);
1374                 __swab64s(&req->bar_rpyid);
1375                 __swab32s(&req->bar_testidx);
1376                 __swab32s(&req->bar_arg);
1377                 sfw_unpack_sid(req->bar_sid);
1378                 __swab64s(&req->bar_bid.bat_id);
1379                 return;
1380         }
1381
1382         if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1383                 srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1384
1385                 __swab32s(&rep->bar_status);
1386                 sfw_unpack_sid(rep->bar_sid);
1387                 return;
1388         }
1389
1390         if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1391                 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1392
1393                 __swab64s(&req->tsr_rpyid);
1394                 __swab64s(&req->tsr_bulkid);
1395                 __swab32s(&req->tsr_loop);
1396                 __swab32s(&req->tsr_ndest);
1397                 __swab32s(&req->tsr_concur);
1398                 __swab32s(&req->tsr_service);
1399                 sfw_unpack_sid(req->tsr_sid);
1400                 __swab64s(&req->tsr_bid.bat_id);
1401                 return;
1402         }
1403
1404         if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1405                 srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1406
1407                 __swab32s(&rep->tsr_status);
1408                 sfw_unpack_sid(rep->tsr_sid);
1409                 return;
1410         }
1411
1412         if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1413                 srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1414
1415                 __swab64s(&req->join_rpyid);
1416                 sfw_unpack_sid(req->join_sid);
1417                 return;
1418         }
1419
1420         if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1421                 srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1422
1423                 __swab32s(&rep->join_status);
1424                 __swab32s(&rep->join_timeout);
1425                 sfw_unpack_sid(rep->join_sid);
1426                 return;
1427         }
1428
1429         LBUG ();
1430         return;
1431 }
1432
1433 void
1434 sfw_abort_rpc (srpc_client_rpc_t *rpc)
1435 {
1436         LASSERT (atomic_read(&rpc->crpc_refcount) > 0);
1437         LASSERT (rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1438
1439         spin_lock(&rpc->crpc_lock);
1440         srpc_abort_rpc(rpc, -EINTR);
1441         spin_unlock(&rpc->crpc_lock);
1442         return;
1443 }
1444
1445 void
1446 sfw_post_rpc (srpc_client_rpc_t *rpc)
1447 {
1448         spin_lock(&rpc->crpc_lock);
1449
1450         LASSERT (!rpc->crpc_closed);
1451         LASSERT (!rpc->crpc_aborted);
1452         LASSERT (list_empty(&rpc->crpc_list));
1453         LASSERT (!sfw_data.fw_shuttingdown);
1454
1455         rpc->crpc_timeout = SFW_CLIENT_RPC_TIMEOUT;
1456         srpc_post_rpc(rpc);
1457
1458         spin_unlock(&rpc->crpc_lock);
1459         return;
1460 }
1461
1462 static srpc_service_t sfw_services[] = 
1463 {
1464         {
1465                 .sv_name = "debug",
1466                 .sv_id   = SRPC_SERVICE_DEBUG,
1467         },
1468         {
1469                 .sv_name = "query stats",
1470                 .sv_id   = SRPC_SERVICE_QUERY_STAT,
1471         },
1472         {
1473                 .sv_name = "make sessin",
1474                 .sv_id   = SRPC_SERVICE_MAKE_SESSION,
1475         },
1476         {
1477                 .sv_name = "remove session",
1478                 .sv_id   = SRPC_SERVICE_REMOVE_SESSION,
1479         },
1480         {
1481                 .sv_name = "batch service",
1482                 .sv_id   = SRPC_SERVICE_BATCH,
1483         },
1484         {
1485                 .sv_name = "test service",
1486                 .sv_id   = SRPC_SERVICE_TEST,
1487         },
1488         {       .sv_name = NULL, }
1489 };
1490
1491 extern sfw_test_client_ops_t ping_test_client;
1492 extern srpc_service_t        ping_test_service;
1493
1494 extern sfw_test_client_ops_t brw_test_client;
1495 extern srpc_service_t        brw_test_service;
1496
1497 int
1498 sfw_startup (void)
1499 {
1500         int              i;
1501         int              rc;
1502         int              error;
1503         srpc_service_t  *sv;
1504         sfw_test_case_t *tsc;
1505
1506 #ifndef __KERNEL__
1507         char *s;
1508
1509         s = getenv("SESSION_TIMEOUT");
1510         session_timeout = s != NULL ? atoi(s) : session_timeout;
1511
1512         s = getenv("BRW_INJECT_ERRORS");
1513         brw_inject_errors = s != NULL ? atoi(s) : brw_inject_errors;
1514 #endif
1515
1516         if (session_timeout < 0) {
1517                 CERROR ("Session timeout must be non-negative: %d\n",
1518                         session_timeout);
1519                 return -EINVAL;
1520         }
1521
1522         if (session_timeout == 0)
1523                 CWARN ("Zero session_timeout specified "
1524                        "- test sessions never expire.\n");
1525
1526         memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1527
1528         sfw_data.fw_session     = NULL;
1529         sfw_data.fw_active_srpc = NULL;
1530         spin_lock_init(&sfw_data.fw_lock);
1531         atomic_set(&sfw_data.fw_nzombies, 0);
1532         CFS_INIT_LIST_HEAD(&sfw_data.fw_tests);
1533         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1534         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1535
1536         rc = sfw_register_test(&brw_test_service, &brw_test_client);
1537         LASSERT (rc == 0);
1538         rc = sfw_register_test(&ping_test_service, &ping_test_client);
1539         LASSERT (rc == 0);
1540
1541         error = 0;
1542         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1543                 sv = tsc->tsc_srv_service;
1544                 sv->sv_concur = SFW_TEST_CONCURRENCY;
1545
1546                 rc = srpc_add_service(sv);
1547                 LASSERT (rc != -EBUSY);
1548                 if (rc != 0) {
1549                         CWARN ("Failed to add %s service: %d\n",
1550                                sv->sv_name, rc);
1551                         error = rc;
1552                 }
1553         }
1554
1555         for (i = 0; ; i++) {
1556                 sv = &sfw_services[i];
1557                 if (sv->sv_name == NULL) break;
1558
1559                 sv->sv_bulk_ready = NULL;
1560                 sv->sv_handler    = sfw_handle_server_rpc;
1561                 sv->sv_concur     = SFW_SERVICE_CONCURRENCY;
1562                 if (sv->sv_id == SRPC_SERVICE_TEST)
1563                         sv->sv_bulk_ready = sfw_bulk_ready;
1564
1565                 rc = srpc_add_service(sv);
1566                 LASSERT (rc != -EBUSY);
1567                 if (rc != 0) {
1568                         CWARN ("Failed to add %s service: %d\n",
1569                                sv->sv_name, rc);
1570                         error = rc;
1571                 }
1572
1573                 /* about to sfw_shutdown, no need to add buffer */
1574                 if (error) continue; 
1575
1576                 rc = srpc_service_add_buffers(sv, SFW_POST_BUFFERS);
1577                 if (rc != SFW_POST_BUFFERS) {
1578                         CWARN ("Failed to reserve enough buffers: "
1579                                "service %s, %d needed, %d reserved\n",
1580                                sv->sv_name, SFW_POST_BUFFERS, rc);
1581                         error = -ENOMEM;
1582                 }
1583         }
1584
1585         if (error != 0)
1586                 sfw_shutdown();
1587         return error;
1588 }
1589
1590 void
1591 sfw_shutdown (void)
1592 {
1593         srpc_service_t  *sv;
1594         sfw_test_case_t *tsc;
1595         int              i;
1596
1597         spin_lock(&sfw_data.fw_lock);
1598
1599         sfw_data.fw_shuttingdown = 1;
1600 #ifdef __KERNEL__
1601         lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1602                        "waiting for active RPC to finish.\n");
1603 #else
1604         LASSERT (sfw_data.fw_active_srpc == NULL);
1605 #endif
1606
1607         if (sfw_del_session_timer() != 0)
1608                 lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1609                                "waiting for session timer to explode.\n");
1610
1611         sfw_deactivate_session();
1612         lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0,
1613                        sfw_data.fw_lock,
1614                        "waiting for %d zombie sessions to die.\n",
1615                        atomic_read(&sfw_data.fw_nzombies));
1616
1617         spin_unlock(&sfw_data.fw_lock);
1618
1619         for (i = 0; ; i++) {
1620                 sv = &sfw_services[i];
1621                 if (sv->sv_name == NULL)
1622                         break;
1623
1624                 srpc_shutdown_service(sv);
1625                 srpc_remove_service(sv);
1626         }
1627
1628         list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) {
1629                 sv = tsc->tsc_srv_service;
1630                 srpc_shutdown_service(sv);
1631                 srpc_remove_service(sv);
1632         }
1633
1634         while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1635                 srpc_client_rpc_t *rpc;
1636
1637                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next, 
1638                                  srpc_client_rpc_t, crpc_list);
1639                 list_del(&rpc->crpc_list);
1640
1641                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1642         }
1643
1644         for (i = 0; ; i++) {
1645                 sv = &sfw_services[i];
1646                 if (sv->sv_name == NULL)
1647                         break;
1648
1649                 srpc_wait_service_shutdown(sv);
1650         }
1651
1652         while (!list_empty(&sfw_data.fw_tests)) {
1653                 tsc = list_entry(sfw_data.fw_tests.next,
1654                                  sfw_test_case_t, tsc_list);
1655                 
1656                 srpc_wait_service_shutdown(tsc->tsc_srv_service);
1657
1658                 list_del(&tsc->tsc_list);
1659                 LIBCFS_FREE(tsc, sizeof(*tsc));
1660         }
1661
1662         return;
1663 }