Whamcloud - gitweb
LU-6977 lod: do_index_try should be called first
[fs/lustre-release.git] / lnet / selftest / framework.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/framework.c
37  *
38  * Author: Isaac Huang <isaac@clusterfs.com>
39  * Author: Liang Zhen  <liangzhen@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LNET
43
44 #include "selftest.h"
45
46 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
47
48 static int session_timeout = 100;
49 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
50                 "test session timeout in seconds (100 by default, 0 == never)");
51
52 static int rpc_timeout = 64;
53 CFS_MODULE_PARM(rpc_timeout, "i", int, 0644,
54                 "rpc timeout in seconds (64 by default, 0 == never)");
55
56 #define sfw_unpack_id(id)               \
57 do {                                    \
58         __swab64s(&(id).nid);           \
59         __swab32s(&(id).pid);           \
60 } while (0)
61
62 #define sfw_unpack_sid(sid)             \
63 do {                                    \
64         __swab64s(&(sid).ses_nid);      \
65         __swab64s(&(sid).ses_stamp);    \
66 } while (0)
67
68 #define sfw_unpack_fw_counters(fc)        \
69 do {                                      \
70         __swab32s(&(fc).running_ms);      \
71         __swab32s(&(fc).active_batches);  \
72         __swab32s(&(fc).zombie_sessions); \
73         __swab32s(&(fc).brw_errors);      \
74         __swab32s(&(fc).ping_errors);     \
75 } while (0)
76
77 #define sfw_unpack_rpc_counters(rc)     \
78 do {                                    \
79         __swab32s(&(rc).errors);        \
80         __swab32s(&(rc).rpcs_sent);     \
81         __swab32s(&(rc).rpcs_rcvd);     \
82         __swab32s(&(rc).rpcs_dropped);  \
83         __swab32s(&(rc).rpcs_expired);  \
84         __swab64s(&(rc).bulk_get);      \
85         __swab64s(&(rc).bulk_put);      \
86 } while (0)
87
88 #define sfw_unpack_lnet_counters(lc)    \
89 do {                                    \
90         __swab32s(&(lc).errors);        \
91         __swab32s(&(lc).msgs_max);      \
92         __swab32s(&(lc).msgs_alloc);    \
93         __swab32s(&(lc).send_count);    \
94         __swab32s(&(lc).recv_count);    \
95         __swab32s(&(lc).drop_count);    \
96         __swab32s(&(lc).route_count);   \
97         __swab64s(&(lc).send_length);   \
98         __swab64s(&(lc).recv_length);   \
99         __swab64s(&(lc).drop_length);   \
100         __swab64s(&(lc).route_length);  \
101 } while (0)
102
103 #define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive) != 0)
104 #define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive) != 0)
105
106 static struct smoketest_framework {
107         /* RPCs to be recycled */
108         struct list_head        fw_zombie_rpcs;
109         /* stopping sessions */
110         struct list_head        fw_zombie_sessions;
111         /* registered test cases */
112         struct list_head        fw_tests;
113         /* # zombie sessions */
114         atomic_t                fw_nzombies;
115         /* serialise */
116         spinlock_t              fw_lock;
117         /* _the_ session */
118         sfw_session_t           *fw_session;
119         /* shutdown in progress */
120         int                     fw_shuttingdown;
121         /* running RPC */
122         srpc_server_rpc_t       *fw_active_srpc;
123 } sfw_data;
124
125 /* forward ref's */
126 int sfw_stop_batch (sfw_batch_t *tsb, int force);
127 void sfw_destroy_session (sfw_session_t *sn);
128
129 static inline sfw_test_case_t *
130 sfw_find_test_case(int id)
131 {
132         sfw_test_case_t *tsc;
133
134         LASSERT(id <= SRPC_SERVICE_MAX_ID);
135         LASSERT(id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
136
137         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
138                 if (tsc->tsc_srv_service->sv_id == id)
139                         return tsc;
140         }
141
142         return NULL;
143 }
144
145 static int
146 sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops)
147 {
148         sfw_test_case_t *tsc;
149
150         if (sfw_find_test_case(service->sv_id) != NULL) {
151                 CERROR ("Failed to register test %s (%d)\n",
152                         service->sv_name, service->sv_id);
153                 return -EEXIST;
154         }
155
156         LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
157         if (tsc == NULL)
158                 return -ENOMEM;
159
160         memset(tsc, 0, sizeof(sfw_test_case_t));
161         tsc->tsc_cli_ops     = cliops;
162         tsc->tsc_srv_service = service;
163
164         list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
165         return 0;
166 }
167
168 static void
169 sfw_add_session_timer (void)
170 {
171         sfw_session_t *sn = sfw_data.fw_session;
172         stt_timer_t   *timer = &sn->sn_timer;
173
174         LASSERT (!sfw_data.fw_shuttingdown);
175
176         if (sn == NULL || sn->sn_timeout == 0)
177                 return;
178
179         LASSERT (!sn->sn_timer_active);
180
181         sn->sn_timer_active = 1;
182         timer->stt_expires = cfs_time_add(sn->sn_timeout,
183                                           cfs_time_current_sec());
184         stt_add_timer(timer);
185         return;
186 }
187
188 static int
189 sfw_del_session_timer (void)
190 {
191         sfw_session_t *sn = sfw_data.fw_session;
192
193         if (sn == NULL || !sn->sn_timer_active)
194                 return 0;
195
196         LASSERT (sn->sn_timeout != 0);
197
198         if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
199                 sn->sn_timer_active = 0;
200                 return 0;
201         }
202
203         return EBUSY; /* racing with sfw_session_expired() */
204 }
205
206 /* called with sfw_data.fw_lock held */
207 static void
208 sfw_deactivate_session (void)
209 __must_hold(&sfw_data.fw_lock)
210 {
211         sfw_session_t *sn = sfw_data.fw_session;
212         int            nactive = 0;
213         sfw_batch_t   *tsb;
214         sfw_test_case_t *tsc;
215
216         if (sn == NULL) return;
217
218         LASSERT(!sn->sn_timer_active);
219
220         sfw_data.fw_session = NULL;
221         atomic_inc(&sfw_data.fw_nzombies);
222         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
223
224         spin_unlock(&sfw_data.fw_lock);
225
226         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
227                 srpc_abort_service(tsc->tsc_srv_service);
228         }
229
230         spin_lock(&sfw_data.fw_lock);
231
232         list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
233                 if (sfw_batch_active(tsb)) {
234                         nactive++;
235                         sfw_stop_batch(tsb, 1);
236                 }
237         }
238
239         if (nactive != 0)
240                 return; /* wait for active batches to stop */
241
242         list_del_init(&sn->sn_list);
243         spin_unlock(&sfw_data.fw_lock);
244
245         sfw_destroy_session(sn);
246
247         spin_lock(&sfw_data.fw_lock);
248 }
249
250
251 static void
252 sfw_session_expired (void *data)
253 {
254         sfw_session_t *sn = data;
255
256         spin_lock(&sfw_data.fw_lock);
257
258         LASSERT (sn->sn_timer_active);
259         LASSERT (sn == sfw_data.fw_session);
260
261         CWARN ("Session expired! sid: %s-"LPU64", name: %s\n",
262                libcfs_nid2str(sn->sn_id.ses_nid),
263                sn->sn_id.ses_stamp, &sn->sn_name[0]);
264
265         sn->sn_timer_active = 0;
266         sfw_deactivate_session();
267
268         spin_unlock(&sfw_data.fw_lock);
269 }
270
271 static inline void
272 sfw_init_session(sfw_session_t *sn, lst_sid_t sid,
273                  unsigned features, const char *name)
274 {
275         stt_timer_t *timer = &sn->sn_timer;
276
277         memset(sn, 0, sizeof(sfw_session_t));
278         INIT_LIST_HEAD(&sn->sn_list);
279         INIT_LIST_HEAD(&sn->sn_batches);
280         atomic_set(&sn->sn_refcount, 1);        /* +1 for caller */
281         atomic_set(&sn->sn_brw_errors, 0);
282         atomic_set(&sn->sn_ping_errors, 0);
283         strlcpy(&sn->sn_name[0], name, sizeof(sn->sn_name));
284
285         sn->sn_timer_active = 0;
286         sn->sn_id           = sid;
287         sn->sn_features     = features;
288         sn->sn_timeout      = session_timeout;
289         sn->sn_started      = cfs_time_current();
290
291         timer->stt_data = sn;
292         timer->stt_func = sfw_session_expired;
293         INIT_LIST_HEAD(&timer->stt_list);
294 }
295
296 /* completion handler for incoming framework RPCs */
297 static void
298 sfw_server_rpc_done(struct srpc_server_rpc *rpc)
299 {
300         struct srpc_service     *sv     = rpc->srpc_scd->scd_svc;
301         int                     status  = rpc->srpc_status;
302
303         CDEBUG (D_NET,
304                 "Incoming framework RPC done: "
305                 "service %s, peer %s, status %s:%d\n",
306                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
307                 swi_state2str(rpc->srpc_wi.swi_state),
308                 status);
309
310         if (rpc->srpc_bulk != NULL)
311                 sfw_free_pages(rpc);
312         return;
313 }
314
315 static void
316 sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
317 {
318         LASSERT(rpc->crpc_bulk.bk_niov == 0);
319         LASSERT(list_empty(&rpc->crpc_list));
320         LASSERT(atomic_read(&rpc->crpc_refcount) == 0);
321
322         CDEBUG(D_NET, "Outgoing framework RPC done: "
323                "service %d, peer %s, status %s:%d:%d\n",
324                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
325                swi_state2str(rpc->crpc_wi.swi_state),
326                rpc->crpc_aborted, rpc->crpc_status);
327
328         spin_lock(&sfw_data.fw_lock);
329
330         /* my callers must finish all RPCs before shutting me down */
331         LASSERT(!sfw_data.fw_shuttingdown);
332         list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
333
334         spin_unlock(&sfw_data.fw_lock);
335 }
336
337 static sfw_batch_t *
338 sfw_find_batch (lst_bid_t bid)
339 {
340         sfw_session_t *sn = sfw_data.fw_session;
341         sfw_batch_t   *bat;
342
343         LASSERT(sn != NULL);
344
345         list_for_each_entry(bat, &sn->sn_batches, bat_list) {
346                 if (bat->bat_id.bat_id == bid.bat_id)
347                         return bat;
348         }
349
350         return NULL;
351 }
352
353 static sfw_batch_t *
354 sfw_bid2batch (lst_bid_t bid)
355 {
356         sfw_session_t *sn = sfw_data.fw_session;
357         sfw_batch_t   *bat;
358
359         LASSERT (sn != NULL);
360
361         bat = sfw_find_batch(bid);
362         if (bat != NULL)
363                 return bat;
364
365         LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
366         if (bat == NULL)
367                 return NULL;
368
369         bat->bat_error   = 0;
370         bat->bat_session = sn;
371         bat->bat_id      = bid;
372         atomic_set(&bat->bat_nactive, 0);
373         INIT_LIST_HEAD(&bat->bat_tests);
374
375         list_add_tail(&bat->bat_list, &sn->sn_batches);
376         return bat;
377 }
378
379 static int
380 sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
381 {
382         sfw_session_t  *sn = sfw_data.fw_session;
383         sfw_counters_t *cnt = &reply->str_fw;
384         sfw_batch_t    *bat;
385         struct timeval  tv;
386
387         reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
388
389         if (request->str_sid.ses_nid == LNET_NID_ANY) {
390                 reply->str_status = EINVAL;
391                 return 0;
392         }
393
394         if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
395                 reply->str_status = ESRCH;
396                 return 0;
397         }
398
399         lnet_counters_get(&reply->str_lnet);
400         srpc_get_counters(&reply->str_rpc);
401
402         /* send over the msecs since the session was started
403          - with 32 bits to send, this is ~49 days */
404         cfs_duration_usec(cfs_time_sub(cfs_time_current(),
405                                        sn->sn_started), &tv);
406
407         cnt->running_ms      = (__u32)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
408         cnt->brw_errors      = atomic_read(&sn->sn_brw_errors);
409         cnt->ping_errors     = atomic_read(&sn->sn_ping_errors);
410         cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
411
412         cnt->active_batches = 0;
413         list_for_each_entry(bat, &sn->sn_batches, bat_list) {
414                 if (atomic_read(&bat->bat_nactive) > 0)
415                         cnt->active_batches++;
416         }
417
418         reply->str_status = 0;
419         return 0;
420 }
421
422 int
423 sfw_make_session(srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
424 {
425         sfw_session_t *sn = sfw_data.fw_session;
426         srpc_msg_t    *msg = container_of(request, srpc_msg_t,
427                                           msg_body.mksn_reqst);
428         int            cplen = 0;
429
430         if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
431                 reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
432                 reply->mksn_status = EINVAL;
433                 return 0;
434         }
435
436         if (sn != NULL) {
437                 reply->mksn_status  = 0;
438                 reply->mksn_sid     = sn->sn_id;
439                 reply->mksn_timeout = sn->sn_timeout;
440
441                 if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) {
442                         atomic_inc(&sn->sn_refcount);
443                         return 0;
444                 }
445
446                 if (!request->mksn_force) {
447                         reply->mksn_status = EBUSY;
448                         cplen = strlcpy(&reply->mksn_name[0], &sn->sn_name[0],
449                                         sizeof(reply->mksn_name));
450                         if (cplen >= sizeof(reply->mksn_name))
451                                 return -E2BIG;
452                         return 0;
453                 }
454         }
455
456         /* reject the request if it requires unknown features
457          * NB: old version will always accept all features because it's not
458          * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also
459          * harmless because it will return zero feature to console, and it's
460          * console's responsibility to make sure all nodes in a session have
461          * same feature mask. */
462         if ((msg->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
463                 reply->mksn_status = EPROTO;
464                 return 0;
465         }
466
467         /* brand new or create by force */
468         LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
469         if (sn == NULL) {
470                 CERROR("dropping RPC mksn under memory pressure\n");
471                 return -ENOMEM;
472         }
473
474         sfw_init_session(sn, request->mksn_sid,
475                          msg->msg_ses_feats, &request->mksn_name[0]);
476
477         spin_lock(&sfw_data.fw_lock);
478
479         sfw_deactivate_session();
480         LASSERT(sfw_data.fw_session == NULL);
481         sfw_data.fw_session = sn;
482
483         spin_unlock(&sfw_data.fw_lock);
484
485         reply->mksn_status  = 0;
486         reply->mksn_sid     = sn->sn_id;
487         reply->mksn_timeout = sn->sn_timeout;
488         return 0;
489 }
490
491 static int
492 sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
493 {
494         sfw_session_t *sn = sfw_data.fw_session;
495
496         reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
497
498         if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
499                 reply->rmsn_status = EINVAL;
500                 return 0;
501         }
502
503         if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
504                 reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
505                 return 0;
506         }
507
508         if (!atomic_dec_and_test(&sn->sn_refcount)) {
509                 reply->rmsn_status = 0;
510                 return 0;
511         }
512
513         spin_lock(&sfw_data.fw_lock);
514         sfw_deactivate_session();
515         spin_unlock(&sfw_data.fw_lock);
516
517         reply->rmsn_status = 0;
518         reply->rmsn_sid    = LST_INVALID_SID;
519         LASSERT(sfw_data.fw_session == NULL);
520         return 0;
521 }
522
523 static int
524 sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
525 {
526         sfw_session_t *sn = sfw_data.fw_session;
527
528         if (sn == NULL) {
529                 reply->dbg_status = ESRCH;
530                 reply->dbg_sid    = LST_INVALID_SID;
531                 return 0;
532         }
533
534         reply->dbg_status  = 0;
535         reply->dbg_sid     = sn->sn_id;
536         reply->dbg_timeout = sn->sn_timeout;
537         if (strlcpy(reply->dbg_name, &sn->sn_name[0], sizeof(reply->dbg_name))
538             >= sizeof(reply->dbg_name))
539                 return -E2BIG;
540
541         return 0;
542 }
543
544 static void
545 sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
546 {
547         sfw_test_unit_t     *tsu = rpc->crpc_priv;
548         sfw_test_instance_t *tsi = tsu->tsu_instance;
549
550         /* Called with hold of tsi->tsi_lock */
551         LASSERT(list_empty(&rpc->crpc_list));
552         list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
553 }
554
555 static inline int
556 sfw_test_buffers(sfw_test_instance_t *tsi)
557 {
558         struct sfw_test_case    *tsc;
559         struct srpc_service     *svc;
560         int                     nbuf;
561
562         LASSERT(tsi != NULL);
563         tsc = sfw_find_test_case(tsi->tsi_service);
564         LASSERT(tsc != NULL);
565         svc = tsc->tsc_srv_service;
566         LASSERT(svc != NULL);
567
568         nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
569         return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
570 }
571
572 static int
573 sfw_load_test(struct sfw_test_instance *tsi)
574 {
575         struct sfw_test_case    *tsc;
576         struct srpc_service     *svc;
577         int                     nbuf;
578         int                     rc;
579
580         LASSERT(tsi != NULL);
581         tsc = sfw_find_test_case(tsi->tsi_service);
582         nbuf = sfw_test_buffers(tsi);
583         LASSERT(tsc != NULL);
584         svc = tsc->tsc_srv_service;
585
586         if (tsi->tsi_is_client) {
587                 tsi->tsi_ops = tsc->tsc_cli_ops;
588                 return 0;
589         }
590
591         rc = srpc_service_add_buffers(svc, nbuf);
592         if (rc != 0) {
593                 CWARN("Failed to reserve enough buffers: "
594                       "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc);
595                 /* NB: this error handler is not strictly correct, because
596                  * it may release more buffers than already allocated,
597                  * but it doesn't matter because request portal should
598                  * be lazy portal and will grow buffers if necessary. */
599                 srpc_service_remove_buffers(svc, nbuf);
600                 return -ENOMEM;
601         }
602
603         CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
604                nbuf * (srpc_serv_is_framework(svc) ?
605                        1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
606         return 0;
607 }
608
609 static void
610 sfw_unload_test(struct sfw_test_instance *tsi)
611 {
612         struct sfw_test_case *tsc;
613
614         LASSERT(tsi != NULL);
615         tsc = sfw_find_test_case(tsi->tsi_service);
616         LASSERT(tsc != NULL);
617
618         if (tsi->tsi_is_client)
619                 return;
620
621         /* shrink buffers, because request portal is lazy portal
622          * which can grow buffers at runtime so we may leave
623          * some buffers behind, but never mind... */
624         srpc_service_remove_buffers(tsc->tsc_srv_service,
625                                     sfw_test_buffers(tsi));
626         return;
627 }
628
629 static void
630 sfw_destroy_test_instance (sfw_test_instance_t *tsi)
631 {
632         srpc_client_rpc_t *rpc;
633         sfw_test_unit_t   *tsu;
634
635         if (!tsi->tsi_is_client) goto clean;
636
637         tsi->tsi_ops->tso_fini(tsi);
638
639         LASSERT(!tsi->tsi_stopping);
640         LASSERT(list_empty(&tsi->tsi_active_rpcs));
641         LASSERT(!sfw_test_active(tsi));
642
643         while (!list_empty(&tsi->tsi_units)) {
644                 tsu = list_entry(tsi->tsi_units.next,
645                                  sfw_test_unit_t, tsu_list);
646                 list_del(&tsu->tsu_list);
647                 LIBCFS_FREE(tsu, sizeof(*tsu));
648         }
649
650         while (!list_empty(&tsi->tsi_free_rpcs)) {
651                 rpc = list_entry(tsi->tsi_free_rpcs.next,
652                                  srpc_client_rpc_t, crpc_list);
653                 list_del(&rpc->crpc_list);
654                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
655         }
656
657 clean:
658         sfw_unload_test(tsi);
659         LIBCFS_FREE(tsi, sizeof(*tsi));
660         return;
661 }
662
663 static void
664 sfw_destroy_batch (sfw_batch_t *tsb)
665 {
666         sfw_test_instance_t *tsi;
667
668         LASSERT(!sfw_batch_active(tsb));
669         LASSERT(list_empty(&tsb->bat_list));
670
671         while (!list_empty(&tsb->bat_tests)) {
672                 tsi = list_entry(tsb->bat_tests.next,
673                                  sfw_test_instance_t, tsi_list);
674                 list_del_init(&tsi->tsi_list);
675                 sfw_destroy_test_instance(tsi);
676         }
677
678         LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
679         return;
680 }
681
682 void
683 sfw_destroy_session (sfw_session_t *sn)
684 {
685         sfw_batch_t *batch;
686
687         LASSERT(list_empty(&sn->sn_list));
688         LASSERT(sn != sfw_data.fw_session);
689
690         while (!list_empty(&sn->sn_batches)) {
691                 batch = list_entry(sn->sn_batches.next,
692                                    sfw_batch_t, bat_list);
693                 list_del_init(&batch->bat_list);
694                 sfw_destroy_batch(batch);
695         }
696
697         LIBCFS_FREE(sn, sizeof(*sn));
698         atomic_dec(&sfw_data.fw_nzombies);
699         return;
700 }
701
702 static void
703 sfw_unpack_addtest_req(srpc_msg_t *msg)
704 {
705         srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
706
707         LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST);
708         LASSERT (req->tsr_is_client);
709
710         if (msg->msg_magic == SRPC_MSG_MAGIC)
711                 return; /* no flipping needed */
712
713         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
714
715         if (req->tsr_service == SRPC_SERVICE_BRW) {
716                 if ((msg->msg_ses_feats & LST_FEAT_BULK_LEN) == 0) {
717                         test_bulk_req_t *bulk = &req->tsr_u.bulk_v0;
718
719                         __swab32s(&bulk->blk_opc);
720                         __swab32s(&bulk->blk_npg);
721                         __swab32s(&bulk->blk_flags);
722
723                 } else {
724                         test_bulk_req_v1_t *bulk = &req->tsr_u.bulk_v1;
725
726                         __swab16s(&bulk->blk_opc);
727                         __swab16s(&bulk->blk_flags);
728                         __swab32s(&bulk->blk_offset);
729                         __swab32s(&bulk->blk_len);
730                 }
731
732                 return;
733         }
734
735         if (req->tsr_service == SRPC_SERVICE_PING) {
736                 test_ping_req_t *ping = &req->tsr_u.ping;
737
738                 __swab32s(&ping->png_size);
739                 __swab32s(&ping->png_flags);
740                 return;
741         }
742
743         LBUG();
744         return;
745 }
746
747 static int
748 sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
749 {
750         srpc_msg_t          *msg = &rpc->srpc_reqstbuf->buf_msg;
751         srpc_test_reqst_t   *req = &msg->msg_body.tes_reqst;
752         srpc_bulk_t         *bk = rpc->srpc_bulk;
753         int                  ndest = req->tsr_ndest;
754         sfw_test_unit_t     *tsu;
755         sfw_test_instance_t *tsi;
756         int                  i;
757         int                  rc;
758
759         LIBCFS_ALLOC(tsi, sizeof(*tsi));
760         if (tsi == NULL) {
761                 CERROR ("Can't allocate test instance for batch: "LPU64"\n",
762                         tsb->bat_id.bat_id);
763                 return -ENOMEM;
764         }
765
766         memset(tsi, 0, sizeof(*tsi));
767         spin_lock_init(&tsi->tsi_lock);
768         atomic_set(&tsi->tsi_nactive, 0);
769         INIT_LIST_HEAD(&tsi->tsi_units);
770         INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
771         INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
772
773         tsi->tsi_stopping      = 0;
774         tsi->tsi_batch         = tsb;
775         tsi->tsi_loop          = req->tsr_loop;
776         tsi->tsi_concur        = req->tsr_concur;
777         tsi->tsi_service       = req->tsr_service;
778         tsi->tsi_is_client     = !!(req->tsr_is_client);
779         tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
780
781         rc = sfw_load_test(tsi);
782         if (rc != 0) {
783                 LIBCFS_FREE(tsi, sizeof(*tsi));
784                 return rc;
785         }
786
787         LASSERT (!sfw_batch_active(tsb));
788
789         if (!tsi->tsi_is_client) {
790                 /* it's test server, just add it to tsb */
791                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
792                 return 0;
793         }
794
795         LASSERT (bk != NULL);
796         LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest);
797         LASSERT((unsigned int)bk->bk_len >=
798                 sizeof(lnet_process_id_packed_t) * ndest);
799
800         sfw_unpack_addtest_req(msg);
801         memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
802
803         for (i = 0; i < ndest; i++) {
804                 lnet_process_id_packed_t *dests;
805                 lnet_process_id_packed_t  id;
806                 int                       j;
807
808                 dests = page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
809                 LASSERT (dests != NULL);  /* my pages are within KVM always */
810                 id = dests[i % SFW_ID_PER_PAGE];
811                 if (msg->msg_magic != SRPC_MSG_MAGIC)
812                         sfw_unpack_id(id);
813
814                 for (j = 0; j < tsi->tsi_concur; j++) {
815                         LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
816                         if (tsu == NULL) {
817                                 rc = -ENOMEM;
818                                 CERROR ("Can't allocate tsu for %d\n",
819                                         tsi->tsi_service);
820                                 goto error;
821                         }
822
823                         tsu->tsu_dest.nid = id.nid;
824                         tsu->tsu_dest.pid = id.pid;
825                         tsu->tsu_instance = tsi;
826                         tsu->tsu_private  = NULL;
827                         list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
828                 }
829         }
830
831         rc = tsi->tsi_ops->tso_init(tsi);
832         if (rc == 0) {
833                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
834                 return 0;
835         }
836
837 error:
838         LASSERT(rc != 0);
839         sfw_destroy_test_instance(tsi);
840         return rc;
841 }
842
843 static void
844 sfw_test_unit_done (sfw_test_unit_t *tsu)
845 {
846         sfw_test_instance_t *tsi = tsu->tsu_instance;
847         sfw_batch_t         *tsb = tsi->tsi_batch;
848         sfw_session_t       *sn = tsb->bat_session;
849
850         LASSERT (sfw_test_active(tsi));
851
852         if (!atomic_dec_and_test(&tsi->tsi_nactive))
853                 return;
854
855         /* the test instance is done */
856         spin_lock(&tsi->tsi_lock);
857
858         tsi->tsi_stopping = 0;
859
860         spin_unlock(&tsi->tsi_lock);
861
862         spin_lock(&sfw_data.fw_lock);
863
864         if (!atomic_dec_and_test(&tsb->bat_nactive) ||/* tsb still active */
865             sn == sfw_data.fw_session) {                  /* sn also active */
866                 spin_unlock(&sfw_data.fw_lock);
867                 return;
868         }
869
870         LASSERT(!list_empty(&sn->sn_list)); /* I'm a zombie! */
871
872         list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
873                 if (sfw_batch_active(tsb)) {
874                         spin_unlock(&sfw_data.fw_lock);
875                         return;
876                 }
877         }
878
879         list_del_init(&sn->sn_list);
880         spin_unlock(&sfw_data.fw_lock);
881
882         sfw_destroy_session(sn);
883         return;
884 }
885
886 static void
887 sfw_test_rpc_done (srpc_client_rpc_t *rpc)
888 {
889         sfw_test_unit_t     *tsu = rpc->crpc_priv;
890         sfw_test_instance_t *tsi = tsu->tsu_instance;
891         int                  done = 0;
892
893         tsi->tsi_ops->tso_done_rpc(tsu, rpc);
894
895         spin_lock(&tsi->tsi_lock);
896
897         LASSERT(sfw_test_active(tsi));
898         LASSERT(!list_empty(&rpc->crpc_list));
899
900         list_del_init(&rpc->crpc_list);
901
902         /* batch is stopping or loop is done or get error */
903         if (tsi->tsi_stopping ||
904             tsu->tsu_loop == 0 ||
905             (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr))
906                 done = 1;
907
908         /* dec ref for poster */
909         srpc_client_rpc_decref(rpc);
910
911         spin_unlock(&tsi->tsi_lock);
912
913         if (!done) {
914                 swi_schedule_workitem(&tsu->tsu_worker);
915                 return;
916         }
917
918         sfw_test_unit_done(tsu);
919         return;
920 }
921
922 int
923 sfw_create_test_rpc(sfw_test_unit_t *tsu, lnet_process_id_t peer,
924                     unsigned features, int nblk, int blklen,
925                     srpc_client_rpc_t **rpcpp)
926 {
927         srpc_client_rpc_t   *rpc = NULL;
928         sfw_test_instance_t *tsi = tsu->tsu_instance;
929
930         spin_lock(&tsi->tsi_lock);
931
932         LASSERT (sfw_test_active(tsi));
933
934         if (!list_empty(&tsi->tsi_free_rpcs)) {
935                 /* pick request from buffer */
936                 rpc = list_entry(tsi->tsi_free_rpcs.next,
937                                  srpc_client_rpc_t, crpc_list);
938                 LASSERT(nblk == rpc->crpc_bulk.bk_niov);
939                 list_del_init(&rpc->crpc_list);
940         }
941
942         spin_unlock(&tsi->tsi_lock);
943
944         if (rpc == NULL) {
945                 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
946                                              blklen, sfw_test_rpc_done,
947                                              sfw_test_rpc_fini, tsu);
948         } else {
949                 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
950                                      blklen, sfw_test_rpc_done,
951                                      sfw_test_rpc_fini, tsu);
952         }
953
954         if (rpc == NULL) {
955                 CERROR("Can't create rpc for test %d\n", tsi->tsi_service);
956                 return -ENOMEM;
957         }
958
959         rpc->crpc_reqstmsg.msg_ses_feats = features;
960         *rpcpp = rpc;
961
962         return 0;
963 }
964
965 static int
966 sfw_run_test (swi_workitem_t *wi)
967 {
968         sfw_test_unit_t     *tsu = wi->swi_workitem.wi_data;
969         sfw_test_instance_t *tsi = tsu->tsu_instance;
970         srpc_client_rpc_t   *rpc = NULL;
971
972         LASSERT (wi == &tsu->tsu_worker);
973
974         if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
975                 LASSERT (rpc == NULL);
976                 goto test_done;
977         }
978
979         LASSERT (rpc != NULL);
980
981         spin_lock(&tsi->tsi_lock);
982
983         if (tsi->tsi_stopping) {
984                 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
985                 spin_unlock(&tsi->tsi_lock);
986                 goto test_done;
987         }
988
989         if (tsu->tsu_loop > 0)
990                 tsu->tsu_loop--;
991
992         list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
993         spin_unlock(&tsi->tsi_lock);
994
995         spin_lock(&rpc->crpc_lock);
996         rpc->crpc_timeout = rpc_timeout;
997         srpc_post_rpc(rpc);
998         spin_unlock(&rpc->crpc_lock);
999         return 0;
1000
1001 test_done:
1002         /*
1003          * No one can schedule me now since:
1004          * - previous RPC, if any, has done and
1005          * - no new RPC is initiated.
1006          * - my batch is still active; no one can run it again now.
1007          * Cancel pending schedules and prevent future schedule attempts:
1008          */
1009         swi_exit_workitem(wi);
1010         sfw_test_unit_done(tsu);
1011         return 1;
1012 }
1013
1014 static int
1015 sfw_run_batch (sfw_batch_t *tsb)
1016 {
1017         swi_workitem_t      *wi;
1018         sfw_test_unit_t     *tsu;
1019         sfw_test_instance_t *tsi;
1020
1021         if (sfw_batch_active(tsb)) {
1022                 CDEBUG(D_NET, "Batch already active: "LPU64" (%d)\n",
1023                        tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
1024                 return 0;
1025         }
1026
1027         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1028                 if (!tsi->tsi_is_client)        /* skip server instances */
1029                         continue;
1030
1031                 LASSERT(!tsi->tsi_stopping);
1032                 LASSERT(!sfw_test_active(tsi));
1033
1034                 atomic_inc(&tsb->bat_nactive);
1035
1036                 list_for_each_entry(tsu, &tsi->tsi_units, tsu_list) {
1037                         atomic_inc(&tsi->tsi_nactive);
1038                         tsu->tsu_loop = tsi->tsi_loop;
1039                         wi = &tsu->tsu_worker;
1040                         swi_init_workitem(wi, tsu, sfw_run_test,
1041                                           lst_sched_test[\
1042                                           lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
1043                         swi_schedule_workitem(wi);
1044                 }
1045         }
1046
1047         return 0;
1048 }
1049
1050 int
1051 sfw_stop_batch (sfw_batch_t *tsb, int force)
1052 {
1053         sfw_test_instance_t *tsi;
1054         srpc_client_rpc_t   *rpc;
1055
1056         if (!sfw_batch_active(tsb)) {
1057                 CDEBUG(D_NET, "Batch "LPU64" inactive\n", tsb->bat_id.bat_id);
1058                 return 0;
1059         }
1060
1061         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1062                 spin_lock(&tsi->tsi_lock);
1063
1064                 if (!tsi->tsi_is_client ||
1065                     !sfw_test_active(tsi) || tsi->tsi_stopping) {
1066                         spin_unlock(&tsi->tsi_lock);
1067                         continue;
1068                 }
1069
1070                 tsi->tsi_stopping = 1;
1071
1072                 if (!force) {
1073                         spin_unlock(&tsi->tsi_lock);
1074                         continue;
1075                 }
1076
1077                 /* abort launched rpcs in the test */
1078                 list_for_each_entry(rpc, &tsi->tsi_active_rpcs, crpc_list) {
1079                         spin_lock(&rpc->crpc_lock);
1080
1081                         srpc_abort_rpc(rpc, -EINTR);
1082
1083                         spin_unlock(&rpc->crpc_lock);
1084                 }
1085
1086                 spin_unlock(&tsi->tsi_lock);
1087         }
1088
1089         return 0;
1090 }
1091
1092 static int
1093 sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
1094 {
1095         sfw_test_instance_t *tsi;
1096
1097         if (testidx < 0)
1098                 return -EINVAL;
1099
1100         if (testidx == 0) {
1101                 reply->bar_active = atomic_read(&tsb->bat_nactive);
1102                 return 0;
1103         }
1104
1105         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1106                 if (testidx-- > 1)
1107                         continue;
1108
1109                 reply->bar_active = atomic_read(&tsi->tsi_nactive);
1110                 return 0;
1111         }
1112
1113         return -ENOENT;
1114 }
1115
1116 void
1117 sfw_free_pages (srpc_server_rpc_t *rpc)
1118 {
1119         srpc_free_bulk(rpc->srpc_bulk);
1120         rpc->srpc_bulk = NULL;
1121 }
1122
1123 int
1124 sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len,
1125                 int sink)
1126 {
1127         LASSERT(rpc->srpc_bulk == NULL);
1128         LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
1129
1130         rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink);
1131         if (rpc->srpc_bulk == NULL)
1132                 return -ENOMEM;
1133
1134         return 0;
1135 }
1136
1137 static int
1138 sfw_add_test (srpc_server_rpc_t *rpc)
1139 {
1140         sfw_session_t     *sn = sfw_data.fw_session;
1141         srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1142         srpc_test_reqst_t *request;
1143         int                rc;
1144         sfw_batch_t       *bat;
1145
1146         request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1147         reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1148
1149         if (request->tsr_loop == 0 ||
1150             request->tsr_concur == 0 ||
1151             request->tsr_sid.ses_nid == LNET_NID_ANY ||
1152             request->tsr_ndest > SFW_MAX_NDESTS ||
1153             (request->tsr_is_client && request->tsr_ndest == 0) ||
1154             request->tsr_concur > SFW_MAX_CONCUR ||
1155             request->tsr_service > SRPC_SERVICE_MAX_ID ||
1156             request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1157                 reply->tsr_status = EINVAL;
1158                 return 0;
1159         }
1160
1161         if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1162             sfw_find_test_case(request->tsr_service) == NULL) {
1163                 reply->tsr_status = ENOENT;
1164                 return 0;
1165         }
1166
1167         bat = sfw_bid2batch(request->tsr_bid);
1168         if (bat == NULL) {
1169                 CERROR("dropping RPC %s from %s under memory pressure\n",
1170                         rpc->srpc_scd->scd_svc->sv_name,
1171                         libcfs_id2str(rpc->srpc_peer));
1172                 return -ENOMEM;
1173         }
1174
1175         if (sfw_batch_active(bat)) {
1176                 reply->tsr_status = EBUSY;
1177                 return 0;
1178         }
1179
1180         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1181                 /* rpc will be resumed later in sfw_bulk_ready */
1182                 int     npg = sfw_id_pages(request->tsr_ndest);
1183                 int     len;
1184
1185                 if ((sn->sn_features & LST_FEAT_BULK_LEN) == 0) {
1186                         len = npg * PAGE_CACHE_SIZE;
1187
1188                 } else  {
1189                         len = sizeof(lnet_process_id_packed_t) *
1190                               request->tsr_ndest;
1191                 }
1192
1193                 return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1);
1194         }
1195
1196         rc = sfw_add_test_instance(bat, rpc);
1197         CDEBUG (rc == 0 ? D_NET : D_WARNING,
1198                 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1199                 rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1200                 request->tsr_is_client ? "client" : "server",
1201                 request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1202
1203         reply->tsr_status = (rc < 0) ? -rc : rc;
1204         return 0;
1205 }
1206
1207 static int
1208 sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1209 {
1210         sfw_session_t *sn = sfw_data.fw_session;
1211         int            rc = 0;
1212         sfw_batch_t   *bat;
1213
1214         reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1215
1216         if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1217                 reply->bar_status = ESRCH;
1218                 return 0;
1219         }
1220
1221         bat = sfw_find_batch(request->bar_bid);
1222         if (bat == NULL) {
1223                 reply->bar_status = ENOENT;
1224                 return 0;
1225         }
1226
1227         switch (request->bar_opc) {
1228         case SRPC_BATCH_OPC_RUN:
1229                 rc = sfw_run_batch(bat);
1230                 break;
1231
1232         case SRPC_BATCH_OPC_STOP:
1233                 rc = sfw_stop_batch(bat, request->bar_arg);
1234                 break;
1235
1236         case SRPC_BATCH_OPC_QUERY:
1237                 rc = sfw_query_batch(bat, request->bar_testidx, reply);
1238                 break;
1239
1240         default:
1241                 return -EINVAL; /* drop it */
1242         }
1243
1244         reply->bar_status = (rc < 0) ? -rc : rc;
1245         return 0;
1246 }
1247
1248 static int
1249 sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
1250 {
1251         struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1252         srpc_msg_t     *reply   = &rpc->srpc_replymsg;
1253         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
1254         unsigned        features = LST_FEATS_MASK;
1255         int             rc = 0;
1256
1257         LASSERT(sfw_data.fw_active_srpc == NULL);
1258         LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1259
1260         spin_lock(&sfw_data.fw_lock);
1261
1262         if (sfw_data.fw_shuttingdown) {
1263                 spin_unlock(&sfw_data.fw_lock);
1264                 return -ESHUTDOWN;
1265         }
1266
1267         /* Remove timer to avoid racing with it or expiring active session */
1268         if (sfw_del_session_timer() != 0) {
1269                 CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1270                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1271                 spin_unlock(&sfw_data.fw_lock);
1272                 return -EAGAIN;
1273         }
1274
1275         sfw_data.fw_active_srpc = rpc;
1276         spin_unlock(&sfw_data.fw_lock);
1277
1278         sfw_unpack_message(request);
1279         LASSERT(request->msg_type == srpc_service2request(sv->sv_id));
1280
1281         /* rpc module should have checked this */
1282         LASSERT(request->msg_version == SRPC_MSG_VERSION);
1283
1284         if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION &&
1285             sv->sv_id != SRPC_SERVICE_DEBUG) {
1286                 sfw_session_t *sn = sfw_data.fw_session;
1287
1288                 if (sn != NULL &&
1289                     sn->sn_features != request->msg_ses_feats) {
1290                         CNETERR("Features of framework RPC don't match "
1291                                 "features of current session: %x/%x\n",
1292                                 request->msg_ses_feats, sn->sn_features);
1293                         reply->msg_body.reply.status = EPROTO;
1294                         reply->msg_body.reply.sid    = sn->sn_id;
1295                         goto out;
1296                 }
1297
1298         } else if ((request->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
1299                 /* NB: at this point, old version will ignore features and
1300                  * create new session anyway, so console should be able
1301                  * to handle this */
1302                 reply->msg_body.reply.status = EPROTO;
1303                 goto out;
1304         }
1305
1306         switch(sv->sv_id) {
1307         default:
1308                 LBUG ();
1309         case SRPC_SERVICE_TEST:
1310                 rc = sfw_add_test(rpc);
1311                 break;
1312
1313         case SRPC_SERVICE_BATCH:
1314                 rc = sfw_control_batch(&request->msg_body.bat_reqst,
1315                                        &reply->msg_body.bat_reply);
1316                 break;
1317
1318         case SRPC_SERVICE_QUERY_STAT:
1319                 rc = sfw_get_stats(&request->msg_body.stat_reqst,
1320                                    &reply->msg_body.stat_reply);
1321                 break;
1322
1323         case SRPC_SERVICE_DEBUG:
1324                 rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1325                                        &reply->msg_body.dbg_reply);
1326                 break;
1327
1328         case SRPC_SERVICE_MAKE_SESSION:
1329                 rc = sfw_make_session(&request->msg_body.mksn_reqst,
1330                                       &reply->msg_body.mksn_reply);
1331                 break;
1332
1333         case SRPC_SERVICE_REMOVE_SESSION:
1334                 rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1335                                         &reply->msg_body.rmsn_reply);
1336                 break;
1337         }
1338
1339         if (sfw_data.fw_session != NULL)
1340                 features = sfw_data.fw_session->sn_features;
1341  out:
1342         reply->msg_ses_feats = features;
1343         rpc->srpc_done = sfw_server_rpc_done;
1344         spin_lock(&sfw_data.fw_lock);
1345
1346         if (!sfw_data.fw_shuttingdown)
1347                 sfw_add_session_timer();
1348
1349         sfw_data.fw_active_srpc = NULL;
1350         spin_unlock(&sfw_data.fw_lock);
1351         return rc;
1352 }
1353
1354 static int
1355 sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
1356 {
1357         struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1358         int                     rc;
1359
1360         LASSERT(rpc->srpc_bulk != NULL);
1361         LASSERT(sv->sv_id == SRPC_SERVICE_TEST);
1362         LASSERT(sfw_data.fw_active_srpc == NULL);
1363         LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1364
1365         spin_lock(&sfw_data.fw_lock);
1366
1367         if (status != 0) {
1368                 CERROR("Bulk transfer failed for RPC: "
1369                        "service %s, peer %s, status %d\n",
1370                        sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1371                 spin_unlock(&sfw_data.fw_lock);
1372                 return -EIO;
1373         }
1374
1375         if (sfw_data.fw_shuttingdown) {
1376                 spin_unlock(&sfw_data.fw_lock);
1377                 return -ESHUTDOWN;
1378         }
1379
1380         if (sfw_del_session_timer() != 0) {
1381                 CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1382                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1383                 spin_unlock(&sfw_data.fw_lock);
1384                 return -EAGAIN;
1385         }
1386
1387         sfw_data.fw_active_srpc = rpc;
1388         spin_unlock(&sfw_data.fw_lock);
1389
1390         rc = sfw_add_test(rpc);
1391
1392         spin_lock(&sfw_data.fw_lock);
1393
1394         if (!sfw_data.fw_shuttingdown)
1395                 sfw_add_session_timer();
1396
1397         sfw_data.fw_active_srpc = NULL;
1398         spin_unlock(&sfw_data.fw_lock);
1399         return rc;
1400 }
1401
1402 srpc_client_rpc_t *
1403 sfw_create_rpc(lnet_process_id_t peer, int service,
1404                unsigned features, int nbulkiov, int bulklen,
1405                void (*done)(srpc_client_rpc_t *), void *priv)
1406 {
1407         srpc_client_rpc_t *rpc = NULL;
1408
1409         spin_lock(&sfw_data.fw_lock);
1410
1411         LASSERT (!sfw_data.fw_shuttingdown);
1412         LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1413
1414         if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1415                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1416                                      srpc_client_rpc_t, crpc_list);
1417                 list_del(&rpc->crpc_list);
1418
1419                 srpc_init_client_rpc(rpc, peer, service, 0, 0,
1420                                      done, sfw_client_rpc_fini, priv);
1421         }
1422
1423         spin_unlock(&sfw_data.fw_lock);
1424
1425         if (rpc == NULL) {
1426                 rpc = srpc_create_client_rpc(peer, service,
1427                                              nbulkiov, bulklen, done,
1428                                              nbulkiov != 0 ?  NULL :
1429                                              sfw_client_rpc_fini,
1430                                              priv);
1431         }
1432
1433         if (rpc != NULL) /* "session" is concept in framework */
1434                 rpc->crpc_reqstmsg.msg_ses_feats = features;
1435
1436         return rpc;
1437 }
1438
1439 void
1440 sfw_unpack_message (srpc_msg_t *msg)
1441 {
1442         if (msg->msg_magic == SRPC_MSG_MAGIC)
1443                 return; /* no flipping needed */
1444
1445         /* srpc module should guarantee I wouldn't get crap */
1446         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1447
1448         if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1449                 srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1450
1451                 __swab32s(&req->str_type);
1452                 __swab64s(&req->str_rpyid);
1453                 sfw_unpack_sid(req->str_sid);
1454                 return;
1455         }
1456
1457         if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1458                 srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1459
1460                 __swab32s(&rep->str_status);
1461                 sfw_unpack_sid(rep->str_sid);
1462                 sfw_unpack_fw_counters(rep->str_fw);
1463                 sfw_unpack_rpc_counters(rep->str_rpc);
1464                 sfw_unpack_lnet_counters(rep->str_lnet);
1465                 return;
1466         }
1467
1468         if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1469                 srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1470
1471                 __swab64s(&req->mksn_rpyid);
1472                 __swab32s(&req->mksn_force);
1473                 sfw_unpack_sid(req->mksn_sid);
1474                 return;
1475         }
1476
1477         if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1478                 srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1479
1480                 __swab32s(&rep->mksn_status);
1481                 __swab32s(&rep->mksn_timeout);
1482                 sfw_unpack_sid(rep->mksn_sid);
1483                 return;
1484         }
1485
1486         if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1487                 srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1488
1489                 __swab64s(&req->rmsn_rpyid);
1490                 sfw_unpack_sid(req->rmsn_sid);
1491                 return;
1492         }
1493
1494         if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1495                 srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1496
1497                 __swab32s(&rep->rmsn_status);
1498                 sfw_unpack_sid(rep->rmsn_sid);
1499                 return;
1500         }
1501
1502         if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1503                 srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1504
1505                 __swab64s(&req->dbg_rpyid);
1506                 __swab32s(&req->dbg_flags);
1507                 sfw_unpack_sid(req->dbg_sid);
1508                 return;
1509         }
1510
1511         if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1512                 srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1513
1514                 __swab32s(&rep->dbg_nbatch);
1515                 __swab32s(&rep->dbg_timeout);
1516                 sfw_unpack_sid(rep->dbg_sid);
1517                 return;
1518         }
1519
1520         if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1521                 srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1522
1523                 __swab32s(&req->bar_opc);
1524                 __swab64s(&req->bar_rpyid);
1525                 __swab32s(&req->bar_testidx);
1526                 __swab32s(&req->bar_arg);
1527                 sfw_unpack_sid(req->bar_sid);
1528                 __swab64s(&req->bar_bid.bat_id);
1529                 return;
1530         }
1531
1532         if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1533                 srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1534
1535                 __swab32s(&rep->bar_status);
1536                 sfw_unpack_sid(rep->bar_sid);
1537                 return;
1538         }
1539
1540         if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1541                 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1542
1543                 __swab64s(&req->tsr_rpyid);
1544                 __swab64s(&req->tsr_bulkid);
1545                 __swab32s(&req->tsr_loop);
1546                 __swab32s(&req->tsr_ndest);
1547                 __swab32s(&req->tsr_concur);
1548                 __swab32s(&req->tsr_service);
1549                 sfw_unpack_sid(req->tsr_sid);
1550                 __swab64s(&req->tsr_bid.bat_id);
1551                 return;
1552         }
1553
1554         if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1555                 srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1556
1557                 __swab32s(&rep->tsr_status);
1558                 sfw_unpack_sid(rep->tsr_sid);
1559                 return;
1560         }
1561
1562         if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1563                 srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1564
1565                 __swab64s(&req->join_rpyid);
1566                 sfw_unpack_sid(req->join_sid);
1567                 return;
1568         }
1569
1570         if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1571                 srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1572
1573                 __swab32s(&rep->join_status);
1574                 __swab32s(&rep->join_timeout);
1575                 sfw_unpack_sid(rep->join_sid);
1576                 return;
1577         }
1578
1579         LBUG ();
1580         return;
1581 }
1582
1583 void
1584 sfw_abort_rpc (srpc_client_rpc_t *rpc)
1585 {
1586         LASSERT(atomic_read(&rpc->crpc_refcount) > 0);
1587         LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1588
1589         spin_lock(&rpc->crpc_lock);
1590         srpc_abort_rpc(rpc, -EINTR);
1591         spin_unlock(&rpc->crpc_lock);
1592         return;
1593 }
1594
1595 void
1596 sfw_post_rpc (srpc_client_rpc_t *rpc)
1597 {
1598         spin_lock(&rpc->crpc_lock);
1599
1600         LASSERT(!rpc->crpc_closed);
1601         LASSERT(!rpc->crpc_aborted);
1602         LASSERT(list_empty(&rpc->crpc_list));
1603         LASSERT(!sfw_data.fw_shuttingdown);
1604
1605         rpc->crpc_timeout = rpc_timeout;
1606         srpc_post_rpc(rpc);
1607
1608         spin_unlock(&rpc->crpc_lock);
1609         return;
1610 }
1611
1612 static srpc_service_t sfw_services[] =
1613 {
1614         {
1615                 /* sv_id */    SRPC_SERVICE_DEBUG,
1616                 /* sv_name */  "debug",
1617                 0
1618         },
1619         {
1620                 /* sv_id */    SRPC_SERVICE_QUERY_STAT,
1621                 /* sv_name */  "query stats",
1622                 0
1623         },
1624         {
1625                 /* sv_id */    SRPC_SERVICE_MAKE_SESSION,
1626                 /* sv_name */  "make session",
1627                 0
1628         },
1629         {
1630                 /* sv_id */    SRPC_SERVICE_REMOVE_SESSION,
1631                 /* sv_name */  "remove session",
1632                 0
1633         },
1634         {
1635                 /* sv_id */    SRPC_SERVICE_BATCH,
1636                 /* sv_name */  "batch service",
1637                 0
1638         },
1639         {
1640                 /* sv_id */    SRPC_SERVICE_TEST,
1641                 /* sv_name */  "test service",
1642                 0
1643         },
1644         {
1645                 /* sv_id */    0,
1646                 /* sv_name */  NULL,
1647                 0
1648         }
1649 };
1650
1651 int
1652 sfw_startup (void)
1653 {
1654         int              i;
1655         int              rc;
1656         int              error;
1657         srpc_service_t  *sv;
1658         sfw_test_case_t *tsc;
1659
1660
1661         if (session_timeout < 0) {
1662                 CERROR ("Session timeout must be non-negative: %d\n",
1663                         session_timeout);
1664                 return -EINVAL;
1665         }
1666
1667         if (rpc_timeout < 0) {
1668                 CERROR ("RPC timeout must be non-negative: %d\n",
1669                         rpc_timeout);
1670                 return -EINVAL;
1671         }
1672
1673         if (session_timeout == 0)
1674                 CWARN ("Zero session_timeout specified "
1675                        "- test sessions never expire.\n");
1676
1677         if (rpc_timeout == 0)
1678                 CWARN ("Zero rpc_timeout specified "
1679                        "- test RPC never expire.\n");
1680
1681         memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1682
1683         sfw_data.fw_session     = NULL;
1684         sfw_data.fw_active_srpc = NULL;
1685         spin_lock_init(&sfw_data.fw_lock);
1686         atomic_set(&sfw_data.fw_nzombies, 0);
1687         INIT_LIST_HEAD(&sfw_data.fw_tests);
1688         INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1689         INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1690
1691         brw_init_test_client();
1692         brw_init_test_service();
1693         rc = sfw_register_test(&brw_test_service, &brw_test_client);
1694         LASSERT (rc == 0);
1695
1696         ping_init_test_client();
1697         ping_init_test_service();
1698         rc = sfw_register_test(&ping_test_service, &ping_test_client);
1699         LASSERT (rc == 0);
1700
1701         error = 0;
1702         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1703                 sv = tsc->tsc_srv_service;
1704
1705                 rc = srpc_add_service(sv);
1706                 LASSERT(rc != -EBUSY);
1707                 if (rc != 0) {
1708                         CWARN("Failed to add %s service: %d\n",
1709                               sv->sv_name, rc);
1710                         error = rc;
1711                 }
1712         }
1713
1714         for (i = 0; ; i++) {
1715                 sv = &sfw_services[i];
1716                 if (sv->sv_name == NULL) break;
1717
1718                 sv->sv_bulk_ready = NULL;
1719                 sv->sv_handler    = sfw_handle_server_rpc;
1720                 sv->sv_wi_total   = SFW_FRWK_WI_MAX;
1721                 if (sv->sv_id == SRPC_SERVICE_TEST)
1722                         sv->sv_bulk_ready = sfw_bulk_ready;
1723
1724                 rc = srpc_add_service(sv);
1725                 LASSERT (rc != -EBUSY);
1726                 if (rc != 0) {
1727                         CWARN ("Failed to add %s service: %d\n",
1728                                sv->sv_name, rc);
1729                         error = rc;
1730                 }
1731
1732                 /* about to sfw_shutdown, no need to add buffer */
1733                 if (error) continue;
1734
1735                 rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
1736                 if (rc != 0) {
1737                         CWARN("Failed to reserve enough buffers: "
1738                               "service %s, %d needed: %d\n",
1739                               sv->sv_name, sv->sv_wi_total, rc);
1740                         error = -ENOMEM;
1741                 }
1742         }
1743
1744         if (error != 0)
1745                 sfw_shutdown();
1746         return error;
1747 }
1748
1749 void
1750 sfw_shutdown (void)
1751 {
1752         srpc_service_t  *sv;
1753         sfw_test_case_t *tsc;
1754         int              i;
1755
1756         spin_lock(&sfw_data.fw_lock);
1757
1758         sfw_data.fw_shuttingdown = 1;
1759         lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1760                        "waiting for active RPC to finish.\n");
1761
1762         if (sfw_del_session_timer() != 0)
1763                 lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1764                                "waiting for session timer to explode.\n");
1765
1766         sfw_deactivate_session();
1767         lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0,
1768                        sfw_data.fw_lock,
1769                        "waiting for %d zombie sessions to die.\n",
1770                        atomic_read(&sfw_data.fw_nzombies));
1771
1772         spin_unlock(&sfw_data.fw_lock);
1773
1774         for (i = 0; ; i++) {
1775                 sv = &sfw_services[i];
1776                 if (sv->sv_name == NULL)
1777                         break;
1778
1779                 srpc_shutdown_service(sv);
1780                 srpc_remove_service(sv);
1781         }
1782
1783         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1784                 sv = tsc->tsc_srv_service;
1785                 srpc_shutdown_service(sv);
1786                 srpc_remove_service(sv);
1787         }
1788
1789         while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1790                 srpc_client_rpc_t *rpc;
1791
1792                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1793                                  srpc_client_rpc_t, crpc_list);
1794                 list_del(&rpc->crpc_list);
1795
1796                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1797         }
1798
1799         for (i = 0; ; i++) {
1800                 sv = &sfw_services[i];
1801                 if (sv->sv_name == NULL)
1802                         break;
1803
1804                 srpc_wait_service_shutdown(sv);
1805         }
1806
1807         while (!list_empty(&sfw_data.fw_tests)) {
1808                 tsc = list_entry(sfw_data.fw_tests.next,
1809                                  sfw_test_case_t, tsc_list);
1810
1811                 srpc_wait_service_shutdown(tsc->tsc_srv_service);
1812
1813                 list_del(&tsc->tsc_list);
1814                 LIBCFS_FREE(tsc, sizeof(*tsc));
1815         }
1816
1817         return;
1818 }