Whamcloud - gitweb
LU-6068 misc: update Intel copyright messages 2014
[fs/lustre-release.git] / lnet / selftest / framework.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/framework.c
37  *
38  * Author: Isaac Huang <isaac@clusterfs.com>
39  * Author: Liang Zhen  <liangzhen@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LNET
43
44 #include "selftest.h"
45
46 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
47
48 static int session_timeout = 100;
49 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
50                 "test session timeout in seconds (100 by default, 0 == never)");
51
52 static int rpc_timeout = 64;
53 CFS_MODULE_PARM(rpc_timeout, "i", int, 0644,
54                 "rpc timeout in seconds (64 by default, 0 == never)");
55
56 #define sfw_unpack_id(id)               \
57 do {                                    \
58         __swab64s(&(id).nid);           \
59         __swab32s(&(id).pid);           \
60 } while (0)
61
62 #define sfw_unpack_sid(sid)             \
63 do {                                    \
64         __swab64s(&(sid).ses_nid);      \
65         __swab64s(&(sid).ses_stamp);    \
66 } while (0)
67
68 #define sfw_unpack_fw_counters(fc)        \
69 do {                                      \
70         __swab32s(&(fc).running_ms);      \
71         __swab32s(&(fc).active_batches);  \
72         __swab32s(&(fc).zombie_sessions); \
73         __swab32s(&(fc).brw_errors);      \
74         __swab32s(&(fc).ping_errors);     \
75 } while (0)
76
77 #define sfw_unpack_rpc_counters(rc)     \
78 do {                                    \
79         __swab32s(&(rc).errors);        \
80         __swab32s(&(rc).rpcs_sent);     \
81         __swab32s(&(rc).rpcs_rcvd);     \
82         __swab32s(&(rc).rpcs_dropped);  \
83         __swab32s(&(rc).rpcs_expired);  \
84         __swab64s(&(rc).bulk_get);      \
85         __swab64s(&(rc).bulk_put);      \
86 } while (0)
87
88 #define sfw_unpack_lnet_counters(lc)    \
89 do {                                    \
90         __swab32s(&(lc).errors);        \
91         __swab32s(&(lc).msgs_max);      \
92         __swab32s(&(lc).msgs_alloc);    \
93         __swab32s(&(lc).send_count);    \
94         __swab32s(&(lc).recv_count);    \
95         __swab32s(&(lc).drop_count);    \
96         __swab32s(&(lc).route_count);   \
97         __swab64s(&(lc).send_length);   \
98         __swab64s(&(lc).recv_length);   \
99         __swab64s(&(lc).drop_length);   \
100         __swab64s(&(lc).route_length);  \
101 } while (0)
102
103 #define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive) != 0)
104 #define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive) != 0)
105
106 struct smoketest_framework {
107         /* RPCs to be recycled */
108         struct list_head        fw_zombie_rpcs;
109         /* stopping sessions */
110         struct list_head        fw_zombie_sessions;
111         /* registered test cases */
112         struct list_head        fw_tests;
113         /* # zombie sessions */
114         atomic_t                fw_nzombies;
115         /* serialise */
116         spinlock_t              fw_lock;
117         /* _the_ session */
118         sfw_session_t           *fw_session;
119         /* shutdown in progress */
120         int                     fw_shuttingdown;
121         /* running RPC */
122         srpc_server_rpc_t       *fw_active_srpc;
123 } sfw_data;
124
125 /* forward ref's */
126 int sfw_stop_batch (sfw_batch_t *tsb, int force);
127 void sfw_destroy_session (sfw_session_t *sn);
128
129 static inline sfw_test_case_t *
130 sfw_find_test_case(int id)
131 {
132         sfw_test_case_t *tsc;
133
134         LASSERT(id <= SRPC_SERVICE_MAX_ID);
135         LASSERT(id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
136
137         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
138                 if (tsc->tsc_srv_service->sv_id == id)
139                         return tsc;
140         }
141
142         return NULL;
143 }
144
145 static int
146 sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops)
147 {
148         sfw_test_case_t *tsc;
149
150         if (sfw_find_test_case(service->sv_id) != NULL) {
151                 CERROR ("Failed to register test %s (%d)\n",
152                         service->sv_name, service->sv_id);
153                 return -EEXIST;
154         }
155
156         LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
157         if (tsc == NULL)
158                 return -ENOMEM;
159
160         memset(tsc, 0, sizeof(sfw_test_case_t));
161         tsc->tsc_cli_ops     = cliops;
162         tsc->tsc_srv_service = service;
163
164         list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
165         return 0;
166 }
167
168 void
169 sfw_add_session_timer (void)
170 {
171         sfw_session_t *sn = sfw_data.fw_session;
172         stt_timer_t   *timer = &sn->sn_timer;
173
174         LASSERT (!sfw_data.fw_shuttingdown);
175
176         if (sn == NULL || sn->sn_timeout == 0)
177                 return;
178
179         LASSERT (!sn->sn_timer_active);
180
181         sn->sn_timer_active = 1;
182         timer->stt_expires = cfs_time_add(sn->sn_timeout,
183                                           cfs_time_current_sec());
184         stt_add_timer(timer);
185         return;
186 }
187
188 int
189 sfw_del_session_timer (void)
190 {
191         sfw_session_t *sn = sfw_data.fw_session;
192
193         if (sn == NULL || !sn->sn_timer_active)
194                 return 0;
195
196         LASSERT (sn->sn_timeout != 0);
197
198         if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
199                 sn->sn_timer_active = 0;
200                 return 0;
201         }
202
203 #ifndef __KERNEL__
204         /* Racing is impossible in single-threaded userland selftest */
205         LBUG();
206 #endif
207         return EBUSY; /* racing with sfw_session_expired() */
208 }
209
210 /* called with sfw_data.fw_lock held */
211 static void
212 sfw_deactivate_session (void)
213 __must_hold(&sfw_data.fw_lock)
214 {
215         sfw_session_t *sn = sfw_data.fw_session;
216         int            nactive = 0;
217         sfw_batch_t   *tsb;
218         sfw_test_case_t *tsc;
219
220         if (sn == NULL) return;
221
222         LASSERT(!sn->sn_timer_active);
223
224         sfw_data.fw_session = NULL;
225         atomic_inc(&sfw_data.fw_nzombies);
226         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
227
228         spin_unlock(&sfw_data.fw_lock);
229
230         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
231                 srpc_abort_service(tsc->tsc_srv_service);
232         }
233
234         spin_lock(&sfw_data.fw_lock);
235
236         list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
237                 if (sfw_batch_active(tsb)) {
238                         nactive++;
239                         sfw_stop_batch(tsb, 1);
240                 }
241         }
242
243         if (nactive != 0)
244                 return; /* wait for active batches to stop */
245
246         list_del_init(&sn->sn_list);
247         spin_unlock(&sfw_data.fw_lock);
248
249         sfw_destroy_session(sn);
250
251         spin_lock(&sfw_data.fw_lock);
252 }
253
254 #ifndef __KERNEL__
255
256 int
257 sfw_session_removed (void)
258 {
259         return (sfw_data.fw_session == NULL) ? 1 : 0;
260 }
261
262 #endif
263
264 void
265 sfw_session_expired (void *data)
266 {
267         sfw_session_t *sn = data;
268
269         spin_lock(&sfw_data.fw_lock);
270
271         LASSERT (sn->sn_timer_active);
272         LASSERT (sn == sfw_data.fw_session);
273
274         CWARN ("Session expired! sid: %s-"LPU64", name: %s\n",
275                libcfs_nid2str(sn->sn_id.ses_nid),
276                sn->sn_id.ses_stamp, &sn->sn_name[0]);
277
278         sn->sn_timer_active = 0;
279         sfw_deactivate_session();
280
281         spin_unlock(&sfw_data.fw_lock);
282 }
283
284 static inline void
285 sfw_init_session(sfw_session_t *sn, lst_sid_t sid,
286                  unsigned features, const char *name)
287 {
288         stt_timer_t *timer = &sn->sn_timer;
289
290         memset(sn, 0, sizeof(sfw_session_t));
291         INIT_LIST_HEAD(&sn->sn_list);
292         INIT_LIST_HEAD(&sn->sn_batches);
293         atomic_set(&sn->sn_refcount, 1);        /* +1 for caller */
294         atomic_set(&sn->sn_brw_errors, 0);
295         atomic_set(&sn->sn_ping_errors, 0);
296         strlcpy(&sn->sn_name[0], name, sizeof(sn->sn_name));
297
298         sn->sn_timer_active = 0;
299         sn->sn_id           = sid;
300         sn->sn_features     = features;
301         sn->sn_timeout      = session_timeout;
302         sn->sn_started      = cfs_time_current();
303
304         timer->stt_data = sn;
305         timer->stt_func = sfw_session_expired;
306         INIT_LIST_HEAD(&timer->stt_list);
307 }
308
309 /* completion handler for incoming framework RPCs */
310 void
311 sfw_server_rpc_done(struct srpc_server_rpc *rpc)
312 {
313         struct srpc_service     *sv     = rpc->srpc_scd->scd_svc;
314         int                     status  = rpc->srpc_status;
315
316         CDEBUG (D_NET,
317                 "Incoming framework RPC done: "
318                 "service %s, peer %s, status %s:%d\n",
319                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
320                 swi_state2str(rpc->srpc_wi.swi_state),
321                 status);
322
323         if (rpc->srpc_bulk != NULL)
324                 sfw_free_pages(rpc);
325         return;
326 }
327
328 void
329 sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
330 {
331         LASSERT(rpc->crpc_bulk.bk_niov == 0);
332         LASSERT(list_empty(&rpc->crpc_list));
333         LASSERT(atomic_read(&rpc->crpc_refcount) == 0);
334 #ifndef __KERNEL__
335         LASSERT(rpc->crpc_bulk.bk_pages == NULL);
336 #endif
337
338         CDEBUG(D_NET, "Outgoing framework RPC done: "
339                "service %d, peer %s, status %s:%d:%d\n",
340                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
341                swi_state2str(rpc->crpc_wi.swi_state),
342                rpc->crpc_aborted, rpc->crpc_status);
343
344         spin_lock(&sfw_data.fw_lock);
345
346         /* my callers must finish all RPCs before shutting me down */
347         LASSERT(!sfw_data.fw_shuttingdown);
348         list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
349
350         spin_unlock(&sfw_data.fw_lock);
351 }
352
353 sfw_batch_t *
354 sfw_find_batch (lst_bid_t bid)
355 {
356         sfw_session_t *sn = sfw_data.fw_session;
357         sfw_batch_t   *bat;
358
359         LASSERT(sn != NULL);
360
361         list_for_each_entry(bat, &sn->sn_batches, bat_list) {
362                 if (bat->bat_id.bat_id == bid.bat_id)
363                         return bat;
364         }
365
366         return NULL;
367 }
368
369 sfw_batch_t *
370 sfw_bid2batch (lst_bid_t bid)
371 {
372         sfw_session_t *sn = sfw_data.fw_session;
373         sfw_batch_t   *bat;
374
375         LASSERT (sn != NULL);
376
377         bat = sfw_find_batch(bid);
378         if (bat != NULL)
379                 return bat;
380
381         LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
382         if (bat == NULL) 
383                 return NULL;
384
385         bat->bat_error   = 0;
386         bat->bat_session = sn;
387         bat->bat_id      = bid;
388         atomic_set(&bat->bat_nactive, 0);
389         INIT_LIST_HEAD(&bat->bat_tests);
390
391         list_add_tail(&bat->bat_list, &sn->sn_batches);
392         return bat;
393 }
394
395 int
396 sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
397 {
398         sfw_session_t  *sn = sfw_data.fw_session;
399         sfw_counters_t *cnt = &reply->str_fw;
400         sfw_batch_t    *bat;
401         struct timeval  tv;
402
403         reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
404
405         if (request->str_sid.ses_nid == LNET_NID_ANY) {
406                 reply->str_status = EINVAL;
407                 return 0;
408         }
409
410         if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
411                 reply->str_status = ESRCH;
412                 return 0;
413         }
414
415         lnet_counters_get(&reply->str_lnet);
416         srpc_get_counters(&reply->str_rpc);
417
418         /* send over the msecs since the session was started
419          - with 32 bits to send, this is ~49 days */
420         cfs_duration_usec(cfs_time_sub(cfs_time_current(),
421                                        sn->sn_started), &tv);
422
423         cnt->running_ms      = (__u32)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
424         cnt->brw_errors      = atomic_read(&sn->sn_brw_errors);
425         cnt->ping_errors     = atomic_read(&sn->sn_ping_errors);
426         cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
427
428         cnt->active_batches = 0;
429         list_for_each_entry(bat, &sn->sn_batches, bat_list) {
430                 if (atomic_read(&bat->bat_nactive) > 0)
431                         cnt->active_batches++;
432         }
433
434         reply->str_status = 0;
435         return 0;
436 }
437
438 int
439 sfw_make_session(srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
440 {
441         sfw_session_t *sn = sfw_data.fw_session;
442         srpc_msg_t    *msg = container_of(request, srpc_msg_t,
443                                           msg_body.mksn_reqst);
444         int            cplen = 0;
445
446         if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
447                 reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
448                 reply->mksn_status = EINVAL;
449                 return 0;
450         }
451
452         if (sn != NULL) {
453                 reply->mksn_status  = 0;
454                 reply->mksn_sid     = sn->sn_id;
455                 reply->mksn_timeout = sn->sn_timeout;
456
457                 if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) {
458                         atomic_inc(&sn->sn_refcount);
459                         return 0;
460                 }
461
462                 if (!request->mksn_force) {
463                         reply->mksn_status = EBUSY;
464                         cplen = strlcpy(&reply->mksn_name[0], &sn->sn_name[0],
465                                         sizeof(reply->mksn_name));
466                         if (cplen >= sizeof(reply->mksn_name))
467                                 return -E2BIG;
468                         return 0;
469                 }
470         }
471
472         /* reject the request if it requires unknown features
473          * NB: old version will always accept all features because it's not
474          * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also
475          * harmless because it will return zero feature to console, and it's
476          * console's responsibility to make sure all nodes in a session have
477          * same feature mask. */
478         if ((msg->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
479                 reply->mksn_status = EPROTO;
480                 return 0;
481         }
482
483         /* brand new or create by force */
484         LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
485         if (sn == NULL) {
486                 CERROR("dropping RPC mksn under memory pressure\n");
487                 return -ENOMEM;
488         }
489
490         sfw_init_session(sn, request->mksn_sid,
491                          msg->msg_ses_feats, &request->mksn_name[0]);
492
493         spin_lock(&sfw_data.fw_lock);
494
495         sfw_deactivate_session();
496         LASSERT(sfw_data.fw_session == NULL);
497         sfw_data.fw_session = sn;
498
499         spin_unlock(&sfw_data.fw_lock);
500
501         reply->mksn_status  = 0;
502         reply->mksn_sid     = sn->sn_id;
503         reply->mksn_timeout = sn->sn_timeout;
504         return 0;
505 }
506
507 int
508 sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
509 {
510         sfw_session_t *sn = sfw_data.fw_session;
511
512         reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
513
514         if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
515                 reply->rmsn_status = EINVAL;
516                 return 0;
517         }
518
519         if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
520                 reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
521                 return 0;
522         }
523
524         if (!atomic_dec_and_test(&sn->sn_refcount)) {
525                 reply->rmsn_status = 0;
526                 return 0;
527         }
528
529         spin_lock(&sfw_data.fw_lock);
530         sfw_deactivate_session();
531         spin_unlock(&sfw_data.fw_lock);
532
533         reply->rmsn_status = 0;
534         reply->rmsn_sid    = LST_INVALID_SID;
535         LASSERT(sfw_data.fw_session == NULL);
536         return 0;
537 }
538
539 int
540 sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
541 {
542         sfw_session_t *sn = sfw_data.fw_session;
543
544         if (sn == NULL) {
545                 reply->dbg_status = ESRCH;
546                 reply->dbg_sid    = LST_INVALID_SID;
547                 return 0;
548         } 
549
550         reply->dbg_status  = 0;
551         reply->dbg_sid     = sn->sn_id;
552         reply->dbg_timeout = sn->sn_timeout;
553         if (strlcpy(reply->dbg_name, &sn->sn_name[0], sizeof(reply->dbg_name))
554             >= sizeof(reply->dbg_name))
555                 return -E2BIG;
556
557         return 0;
558 }
559
560 void
561 sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
562 {
563         sfw_test_unit_t     *tsu = rpc->crpc_priv;
564         sfw_test_instance_t *tsi = tsu->tsu_instance;
565
566         /* Called with hold of tsi->tsi_lock */
567         LASSERT(list_empty(&rpc->crpc_list));
568         list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
569 }
570
571 static inline int
572 sfw_test_buffers(sfw_test_instance_t *tsi)
573 {
574         struct sfw_test_case    *tsc;
575         struct srpc_service     *svc;
576         int                     nbuf;
577
578         LASSERT(tsi != NULL);
579         tsc = sfw_find_test_case(tsi->tsi_service);
580         LASSERT(tsc != NULL);
581         svc = tsc->tsc_srv_service;
582         LASSERT(svc != NULL);
583
584         nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
585         return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
586 }
587
588 int
589 sfw_load_test(struct sfw_test_instance *tsi)
590 {
591         struct sfw_test_case    *tsc;
592         struct srpc_service     *svc;
593         int                     nbuf;
594         int                     rc;
595
596         LASSERT(tsi != NULL);
597         tsc = sfw_find_test_case(tsi->tsi_service);
598         nbuf = sfw_test_buffers(tsi);
599         LASSERT(tsc != NULL);
600         svc = tsc->tsc_srv_service;
601
602         if (tsi->tsi_is_client) {
603                 tsi->tsi_ops = tsc->tsc_cli_ops;
604                 return 0;
605         }
606
607         rc = srpc_service_add_buffers(svc, nbuf);
608         if (rc != 0) {
609                 CWARN("Failed to reserve enough buffers: "
610                       "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc);
611                 /* NB: this error handler is not strictly correct, because
612                  * it may release more buffers than already allocated,
613                  * but it doesn't matter because request portal should
614                  * be lazy portal and will grow buffers if necessary. */
615                 srpc_service_remove_buffers(svc, nbuf);
616                 return -ENOMEM;
617         }
618
619         CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
620                nbuf * (srpc_serv_is_framework(svc) ?
621                        1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
622         return 0;
623 }
624
625 void
626 sfw_unload_test(struct sfw_test_instance *tsi)
627 {
628         struct sfw_test_case *tsc;
629
630         LASSERT(tsi != NULL);
631         tsc = sfw_find_test_case(tsi->tsi_service);
632         LASSERT(tsc != NULL);
633
634         if (tsi->tsi_is_client)
635                 return;
636
637         /* shrink buffers, because request portal is lazy portal
638          * which can grow buffers at runtime so we may leave
639          * some buffers behind, but never mind... */
640         srpc_service_remove_buffers(tsc->tsc_srv_service,
641                                     sfw_test_buffers(tsi));
642         return;
643 }
644
645 void
646 sfw_destroy_test_instance (sfw_test_instance_t *tsi)
647 {
648         srpc_client_rpc_t *rpc;
649         sfw_test_unit_t   *tsu;
650
651         if (!tsi->tsi_is_client) goto clean;
652
653         tsi->tsi_ops->tso_fini(tsi);
654
655         LASSERT(!tsi->tsi_stopping);
656         LASSERT(list_empty(&tsi->tsi_active_rpcs));
657         LASSERT(!sfw_test_active(tsi));
658
659         while (!list_empty(&tsi->tsi_units)) {
660                 tsu = list_entry(tsi->tsi_units.next,
661                                  sfw_test_unit_t, tsu_list);
662                 list_del(&tsu->tsu_list);
663                 LIBCFS_FREE(tsu, sizeof(*tsu));
664         }
665
666         while (!list_empty(&tsi->tsi_free_rpcs)) {
667                 rpc = list_entry(tsi->tsi_free_rpcs.next,
668                                  srpc_client_rpc_t, crpc_list);
669                 list_del(&rpc->crpc_list);
670                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
671         }
672
673 clean:
674         sfw_unload_test(tsi);
675         LIBCFS_FREE(tsi, sizeof(*tsi));
676         return;
677 }
678
679 void
680 sfw_destroy_batch (sfw_batch_t *tsb)
681 {
682         sfw_test_instance_t *tsi;
683
684         LASSERT(!sfw_batch_active(tsb));
685         LASSERT(list_empty(&tsb->bat_list));
686
687         while (!list_empty(&tsb->bat_tests)) {
688                 tsi = list_entry(tsb->bat_tests.next,
689                                  sfw_test_instance_t, tsi_list);
690                 list_del_init(&tsi->tsi_list);
691                 sfw_destroy_test_instance(tsi);
692         }
693
694         LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
695         return;
696 }
697
698 void
699 sfw_destroy_session (sfw_session_t *sn)
700 {
701         sfw_batch_t *batch;
702
703         LASSERT(list_empty(&sn->sn_list));
704         LASSERT(sn != sfw_data.fw_session);
705
706         while (!list_empty(&sn->sn_batches)) {
707                 batch = list_entry(sn->sn_batches.next,
708                                    sfw_batch_t, bat_list);
709                 list_del_init(&batch->bat_list);
710                 sfw_destroy_batch(batch);
711         }
712
713         LIBCFS_FREE(sn, sizeof(*sn));
714         atomic_dec(&sfw_data.fw_nzombies);
715         return;
716 }
717
718 void
719 sfw_unpack_addtest_req(srpc_msg_t *msg)
720 {
721         srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
722
723         LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST);
724         LASSERT (req->tsr_is_client);
725
726         if (msg->msg_magic == SRPC_MSG_MAGIC)
727                 return; /* no flipping needed */
728
729         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
730
731         if (req->tsr_service == SRPC_SERVICE_BRW) {
732                 if ((msg->msg_ses_feats & LST_FEAT_BULK_LEN) == 0) {
733                         test_bulk_req_t *bulk = &req->tsr_u.bulk_v0;
734
735                         __swab32s(&bulk->blk_opc);
736                         __swab32s(&bulk->blk_npg);
737                         __swab32s(&bulk->blk_flags);
738
739                 } else {
740                         test_bulk_req_v1_t *bulk = &req->tsr_u.bulk_v1;
741
742                         __swab16s(&bulk->blk_opc);
743                         __swab16s(&bulk->blk_flags);
744                         __swab32s(&bulk->blk_offset);
745                         __swab32s(&bulk->blk_len);
746                 }
747
748                 return;
749         }
750
751         if (req->tsr_service == SRPC_SERVICE_PING) {
752                 test_ping_req_t *ping = &req->tsr_u.ping;
753
754                 __swab32s(&ping->png_size);
755                 __swab32s(&ping->png_flags);
756                 return;
757         }
758
759         LBUG();
760         return;
761 }
762
763 int
764 sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
765 {
766         srpc_msg_t          *msg = &rpc->srpc_reqstbuf->buf_msg;
767         srpc_test_reqst_t   *req = &msg->msg_body.tes_reqst;
768         srpc_bulk_t         *bk = rpc->srpc_bulk;
769         int                  ndest = req->tsr_ndest;
770         sfw_test_unit_t     *tsu;
771         sfw_test_instance_t *tsi;
772         int                  i;
773         int                  rc;
774
775         LIBCFS_ALLOC(tsi, sizeof(*tsi));
776         if (tsi == NULL) {
777                 CERROR ("Can't allocate test instance for batch: "LPU64"\n",
778                         tsb->bat_id.bat_id);
779                 return -ENOMEM;
780         }
781
782         memset(tsi, 0, sizeof(*tsi));
783         spin_lock_init(&tsi->tsi_lock);
784         atomic_set(&tsi->tsi_nactive, 0);
785         INIT_LIST_HEAD(&tsi->tsi_units);
786         INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
787         INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
788
789         tsi->tsi_stopping      = 0;
790         tsi->tsi_batch         = tsb;
791         tsi->tsi_loop          = req->tsr_loop;
792         tsi->tsi_concur        = req->tsr_concur;
793         tsi->tsi_service       = req->tsr_service;
794         tsi->tsi_is_client     = !!(req->tsr_is_client);
795         tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
796
797         rc = sfw_load_test(tsi);
798         if (rc != 0) {
799                 LIBCFS_FREE(tsi, sizeof(*tsi));
800                 return rc;
801         }
802
803         LASSERT (!sfw_batch_active(tsb));
804
805         if (!tsi->tsi_is_client) {
806                 /* it's test server, just add it to tsb */
807                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
808                 return 0;
809         }
810
811         LASSERT (bk != NULL);
812 #ifndef __KERNEL__
813         LASSERT (bk->bk_pages != NULL);
814 #endif
815         LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest);
816         LASSERT((unsigned int)bk->bk_len >=
817                 sizeof(lnet_process_id_packed_t) * ndest);
818
819         sfw_unpack_addtest_req(msg);
820         memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
821
822         for (i = 0; i < ndest; i++) {
823                 lnet_process_id_packed_t *dests;
824                 lnet_process_id_packed_t  id;
825                 int                       j;
826
827 #ifdef __KERNEL__
828                 dests = page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
829                 LASSERT (dests != NULL);  /* my pages are within KVM always */
830 #else
831                 dests = page_address(bk->bk_pages[i / SFW_ID_PER_PAGE]);
832 #endif
833                 id = dests[i % SFW_ID_PER_PAGE];
834                 if (msg->msg_magic != SRPC_MSG_MAGIC)
835                         sfw_unpack_id(id);
836
837                 for (j = 0; j < tsi->tsi_concur; j++) {
838                         LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
839                         if (tsu == NULL) {
840                                 rc = -ENOMEM;
841                                 CERROR ("Can't allocate tsu for %d\n",
842                                         tsi->tsi_service);
843                                 goto error;
844                         }
845
846                         tsu->tsu_dest.nid = id.nid;
847                         tsu->tsu_dest.pid = id.pid;
848                         tsu->tsu_instance = tsi;
849                         tsu->tsu_private  = NULL;
850                         list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
851                 }
852         }
853
854         rc = tsi->tsi_ops->tso_init(tsi);
855         if (rc == 0) {
856                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
857                 return 0;
858         }
859
860 error:
861         LASSERT(rc != 0);
862         sfw_destroy_test_instance(tsi);
863         return rc;
864 }
865
866 static void
867 sfw_test_unit_done (sfw_test_unit_t *tsu)
868 {
869         sfw_test_instance_t *tsi = tsu->tsu_instance;
870         sfw_batch_t         *tsb = tsi->tsi_batch;
871         sfw_session_t       *sn = tsb->bat_session;
872
873         LASSERT (sfw_test_active(tsi));
874
875         if (!atomic_dec_and_test(&tsi->tsi_nactive))
876                 return;
877
878         /* the test instance is done */
879         spin_lock(&tsi->tsi_lock);
880
881         tsi->tsi_stopping = 0;
882
883         spin_unlock(&tsi->tsi_lock);
884
885         spin_lock(&sfw_data.fw_lock);
886
887         if (!atomic_dec_and_test(&tsb->bat_nactive) ||/* tsb still active */
888             sn == sfw_data.fw_session) {                  /* sn also active */
889                 spin_unlock(&sfw_data.fw_lock);
890                 return;
891         }
892
893         LASSERT(!list_empty(&sn->sn_list)); /* I'm a zombie! */
894
895         list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
896                 if (sfw_batch_active(tsb)) {
897                         spin_unlock(&sfw_data.fw_lock);
898                         return;
899                 }
900         }
901
902         list_del_init(&sn->sn_list);
903         spin_unlock(&sfw_data.fw_lock);
904
905         sfw_destroy_session(sn);
906         return;
907 }
908
909 void
910 sfw_test_rpc_done (srpc_client_rpc_t *rpc)
911 {
912         sfw_test_unit_t     *tsu = rpc->crpc_priv;
913         sfw_test_instance_t *tsi = tsu->tsu_instance;
914         int                  done = 0;
915
916         tsi->tsi_ops->tso_done_rpc(tsu, rpc);
917
918         spin_lock(&tsi->tsi_lock);
919
920         LASSERT(sfw_test_active(tsi));
921         LASSERT(!list_empty(&rpc->crpc_list));
922
923         list_del_init(&rpc->crpc_list);
924
925         /* batch is stopping or loop is done or get error */
926         if (tsi->tsi_stopping ||
927             tsu->tsu_loop == 0 ||
928             (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr))
929                 done = 1;
930
931         /* dec ref for poster */
932         srpc_client_rpc_decref(rpc);
933
934         spin_unlock(&tsi->tsi_lock);
935
936         if (!done) {
937                 swi_schedule_workitem(&tsu->tsu_worker);
938                 return;
939         }
940
941         sfw_test_unit_done(tsu);
942         return;
943 }
944
945 int
946 sfw_create_test_rpc(sfw_test_unit_t *tsu, lnet_process_id_t peer,
947                     unsigned features, int nblk, int blklen,
948                     srpc_client_rpc_t **rpcpp)
949 {
950         srpc_client_rpc_t   *rpc = NULL;
951         sfw_test_instance_t *tsi = tsu->tsu_instance;
952
953         spin_lock(&tsi->tsi_lock);
954
955         LASSERT (sfw_test_active(tsi));
956
957         if (!list_empty(&tsi->tsi_free_rpcs)) {
958                 /* pick request from buffer */
959                 rpc = list_entry(tsi->tsi_free_rpcs.next,
960                                  srpc_client_rpc_t, crpc_list);
961                 LASSERT(nblk == rpc->crpc_bulk.bk_niov);
962                 list_del_init(&rpc->crpc_list);
963         }
964
965         spin_unlock(&tsi->tsi_lock);
966
967         if (rpc == NULL) {
968                 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
969                                              blklen, sfw_test_rpc_done,
970                                              sfw_test_rpc_fini, tsu);
971         } else {
972                 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
973                                      blklen, sfw_test_rpc_done,
974                                      sfw_test_rpc_fini, tsu);
975         }
976
977         if (rpc == NULL) {
978                 CERROR("Can't create rpc for test %d\n", tsi->tsi_service);
979                 return -ENOMEM;
980         }
981
982         rpc->crpc_reqstmsg.msg_ses_feats = features;
983         *rpcpp = rpc;
984
985         return 0;
986 }
987
988 int
989 sfw_run_test (swi_workitem_t *wi)
990 {
991         sfw_test_unit_t     *tsu = wi->swi_workitem.wi_data;
992         sfw_test_instance_t *tsi = tsu->tsu_instance;
993         srpc_client_rpc_t   *rpc = NULL;
994
995         LASSERT (wi == &tsu->tsu_worker);
996
997         if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
998                 LASSERT (rpc == NULL);
999                 goto test_done;
1000         }
1001
1002         LASSERT (rpc != NULL);
1003
1004         spin_lock(&tsi->tsi_lock);
1005
1006         if (tsi->tsi_stopping) {
1007                 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
1008                 spin_unlock(&tsi->tsi_lock);
1009                 goto test_done;
1010         }
1011
1012         if (tsu->tsu_loop > 0)
1013                 tsu->tsu_loop--;
1014
1015         list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
1016         spin_unlock(&tsi->tsi_lock);
1017
1018         spin_lock(&rpc->crpc_lock);
1019         rpc->crpc_timeout = rpc_timeout;
1020         srpc_post_rpc(rpc);
1021         spin_unlock(&rpc->crpc_lock);
1022         return 0;
1023
1024 test_done:
1025         /*
1026          * No one can schedule me now since:
1027          * - previous RPC, if any, has done and
1028          * - no new RPC is initiated.
1029          * - my batch is still active; no one can run it again now.
1030          * Cancel pending schedules and prevent future schedule attempts:
1031          */
1032         swi_exit_workitem(wi);
1033         sfw_test_unit_done(tsu);
1034         return 1;
1035 }
1036
1037 int
1038 sfw_run_batch (sfw_batch_t *tsb)
1039 {
1040         swi_workitem_t      *wi;
1041         sfw_test_unit_t     *tsu;
1042         sfw_test_instance_t *tsi;
1043
1044         if (sfw_batch_active(tsb)) {
1045                 CDEBUG(D_NET, "Batch already active: "LPU64" (%d)\n",
1046                        tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
1047                 return 0;
1048         }
1049
1050         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1051                 if (!tsi->tsi_is_client)        /* skip server instances */
1052                         continue;
1053
1054                 LASSERT(!tsi->tsi_stopping);
1055                 LASSERT(!sfw_test_active(tsi));
1056
1057                 atomic_inc(&tsb->bat_nactive);
1058
1059                 list_for_each_entry(tsu, &tsi->tsi_units, tsu_list) {
1060                         atomic_inc(&tsi->tsi_nactive);
1061                         tsu->tsu_loop = tsi->tsi_loop;
1062                         wi = &tsu->tsu_worker;
1063                         swi_init_workitem(wi, tsu, sfw_run_test,
1064                                           lst_sched_test[\
1065                                           lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
1066                         swi_schedule_workitem(wi);
1067                 }
1068         }
1069
1070         return 0;
1071 }
1072
1073 int
1074 sfw_stop_batch (sfw_batch_t *tsb, int force)
1075 {
1076         sfw_test_instance_t *tsi;
1077         srpc_client_rpc_t   *rpc;
1078
1079         if (!sfw_batch_active(tsb)) {
1080                 CDEBUG(D_NET, "Batch "LPU64" inactive\n", tsb->bat_id.bat_id);
1081                 return 0;
1082         }
1083
1084         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1085                 spin_lock(&tsi->tsi_lock);
1086
1087                 if (!tsi->tsi_is_client ||
1088                     !sfw_test_active(tsi) || tsi->tsi_stopping) {
1089                         spin_unlock(&tsi->tsi_lock);
1090                         continue;
1091                 }
1092
1093                 tsi->tsi_stopping = 1;
1094
1095                 if (!force) {
1096                         spin_unlock(&tsi->tsi_lock);
1097                         continue;
1098                 }
1099
1100                 /* abort launched rpcs in the test */
1101                 list_for_each_entry(rpc, &tsi->tsi_active_rpcs, crpc_list) {
1102                         spin_lock(&rpc->crpc_lock);
1103
1104                         srpc_abort_rpc(rpc, -EINTR);
1105
1106                         spin_unlock(&rpc->crpc_lock);
1107                 }
1108
1109                 spin_unlock(&tsi->tsi_lock);
1110         }
1111
1112         return 0;
1113 }
1114
1115 int
1116 sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
1117 {
1118         sfw_test_instance_t *tsi;
1119
1120         if (testidx < 0)
1121                 return -EINVAL;
1122
1123         if (testidx == 0) {
1124                 reply->bar_active = atomic_read(&tsb->bat_nactive);
1125                 return 0;
1126         }
1127
1128         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1129                 if (testidx-- > 1)
1130                         continue;
1131
1132                 reply->bar_active = atomic_read(&tsi->tsi_nactive);
1133                 return 0;
1134         }
1135
1136         return -ENOENT;
1137 }
1138
1139 void
1140 sfw_free_pages (srpc_server_rpc_t *rpc)
1141 {
1142         srpc_free_bulk(rpc->srpc_bulk);
1143         rpc->srpc_bulk = NULL;
1144 }
1145
1146 int
1147 sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len,
1148                 int sink)
1149 {
1150         LASSERT(rpc->srpc_bulk == NULL);
1151         LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
1152
1153         rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink);
1154         if (rpc->srpc_bulk == NULL)
1155                 return -ENOMEM;
1156
1157         return 0;
1158 }
1159
1160 int
1161 sfw_add_test (srpc_server_rpc_t *rpc)
1162 {
1163         sfw_session_t     *sn = sfw_data.fw_session;
1164         srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1165         srpc_test_reqst_t *request;
1166         int                rc;
1167         sfw_batch_t       *bat;
1168
1169         request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1170         reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1171
1172         if (request->tsr_loop == 0 ||
1173             request->tsr_concur == 0 ||
1174             request->tsr_sid.ses_nid == LNET_NID_ANY ||
1175             request->tsr_ndest > SFW_MAX_NDESTS ||
1176             (request->tsr_is_client && request->tsr_ndest == 0) ||
1177             request->tsr_concur > SFW_MAX_CONCUR ||
1178             request->tsr_service > SRPC_SERVICE_MAX_ID ||
1179             request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1180                 reply->tsr_status = EINVAL;
1181                 return 0;
1182         }
1183
1184         if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1185             sfw_find_test_case(request->tsr_service) == NULL) {
1186                 reply->tsr_status = ENOENT;
1187                 return 0;
1188         }
1189
1190         bat = sfw_bid2batch(request->tsr_bid);
1191         if (bat == NULL) {
1192                 CERROR("dropping RPC %s from %s under memory pressure\n",
1193                         rpc->srpc_scd->scd_svc->sv_name,
1194                         libcfs_id2str(rpc->srpc_peer));
1195                 return -ENOMEM;
1196         }
1197
1198         if (sfw_batch_active(bat)) {
1199                 reply->tsr_status = EBUSY;
1200                 return 0;
1201         }
1202
1203         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1204                 /* rpc will be resumed later in sfw_bulk_ready */
1205                 int     npg = sfw_id_pages(request->tsr_ndest);
1206                 int     len;
1207
1208                 if ((sn->sn_features & LST_FEAT_BULK_LEN) == 0) {
1209                         len = npg * PAGE_CACHE_SIZE;
1210
1211                 } else  {
1212                         len = sizeof(lnet_process_id_packed_t) *
1213                               request->tsr_ndest;
1214                 }
1215
1216                 return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1);
1217         }
1218
1219         rc = sfw_add_test_instance(bat, rpc);
1220         CDEBUG (rc == 0 ? D_NET : D_WARNING,
1221                 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1222                 rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1223                 request->tsr_is_client ? "client" : "server",
1224                 request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1225
1226         reply->tsr_status = (rc < 0) ? -rc : rc;
1227         return 0;
1228 }
1229
1230 int
1231 sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1232 {
1233         sfw_session_t *sn = sfw_data.fw_session;
1234         int            rc = 0;
1235         sfw_batch_t   *bat;
1236
1237         reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1238
1239         if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1240                 reply->bar_status = ESRCH;
1241                 return 0;
1242         }
1243
1244         bat = sfw_find_batch(request->bar_bid);
1245         if (bat == NULL) {
1246                 reply->bar_status = ENOENT;
1247                 return 0;
1248         }
1249
1250         switch (request->bar_opc) {
1251         case SRPC_BATCH_OPC_RUN:
1252                 rc = sfw_run_batch(bat);
1253                 break;
1254
1255         case SRPC_BATCH_OPC_STOP:
1256                 rc = sfw_stop_batch(bat, request->bar_arg);
1257                 break;
1258
1259         case SRPC_BATCH_OPC_QUERY:
1260                 rc = sfw_query_batch(bat, request->bar_testidx, reply);
1261                 break;
1262
1263         default:
1264                 return -EINVAL; /* drop it */
1265         }
1266
1267         reply->bar_status = (rc < 0) ? -rc : rc;
1268         return 0;
1269 }
1270
1271 int
1272 sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
1273 {
1274         struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1275         srpc_msg_t     *reply   = &rpc->srpc_replymsg;
1276         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
1277         unsigned        features = LST_FEATS_MASK;
1278         int             rc = 0;
1279
1280         LASSERT(sfw_data.fw_active_srpc == NULL);
1281         LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1282
1283         spin_lock(&sfw_data.fw_lock);
1284
1285         if (sfw_data.fw_shuttingdown) {
1286                 spin_unlock(&sfw_data.fw_lock);
1287                 return -ESHUTDOWN;
1288         }
1289
1290         /* Remove timer to avoid racing with it or expiring active session */
1291         if (sfw_del_session_timer() != 0) {
1292                 CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1293                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1294                 spin_unlock(&sfw_data.fw_lock);
1295                 return -EAGAIN;
1296         }
1297
1298         sfw_data.fw_active_srpc = rpc;
1299         spin_unlock(&sfw_data.fw_lock);
1300
1301         sfw_unpack_message(request);
1302         LASSERT(request->msg_type == srpc_service2request(sv->sv_id));
1303
1304         /* rpc module should have checked this */
1305         LASSERT(request->msg_version == SRPC_MSG_VERSION);
1306
1307         if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION &&
1308             sv->sv_id != SRPC_SERVICE_DEBUG) {
1309                 sfw_session_t *sn = sfw_data.fw_session;
1310
1311                 if (sn != NULL &&
1312                     sn->sn_features != request->msg_ses_feats) {
1313                         CNETERR("Features of framework RPC don't match "
1314                                 "features of current session: %x/%x\n",
1315                                 request->msg_ses_feats, sn->sn_features);
1316                         reply->msg_body.reply.status = EPROTO;
1317                         reply->msg_body.reply.sid    = sn->sn_id;
1318                         goto out;
1319                 }
1320
1321         } else if ((request->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
1322                 /* NB: at this point, old version will ignore features and
1323                  * create new session anyway, so console should be able
1324                  * to handle this */
1325                 reply->msg_body.reply.status = EPROTO;
1326                 goto out;
1327         }
1328
1329         switch(sv->sv_id) {
1330         default:
1331                 LBUG ();
1332         case SRPC_SERVICE_TEST:
1333                 rc = sfw_add_test(rpc);
1334                 break;
1335
1336         case SRPC_SERVICE_BATCH:
1337                 rc = sfw_control_batch(&request->msg_body.bat_reqst,
1338                                        &reply->msg_body.bat_reply);
1339                 break;
1340
1341         case SRPC_SERVICE_QUERY_STAT:
1342                 rc = sfw_get_stats(&request->msg_body.stat_reqst,
1343                                    &reply->msg_body.stat_reply);
1344                 break;
1345
1346         case SRPC_SERVICE_DEBUG:
1347                 rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1348                                        &reply->msg_body.dbg_reply);
1349                 break;
1350
1351         case SRPC_SERVICE_MAKE_SESSION:
1352                 rc = sfw_make_session(&request->msg_body.mksn_reqst,
1353                                       &reply->msg_body.mksn_reply);
1354                 break;
1355
1356         case SRPC_SERVICE_REMOVE_SESSION:
1357                 rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1358                                         &reply->msg_body.rmsn_reply);
1359                 break;
1360         }
1361
1362         if (sfw_data.fw_session != NULL)
1363                 features = sfw_data.fw_session->sn_features;
1364  out:
1365         reply->msg_ses_feats = features;
1366         rpc->srpc_done = sfw_server_rpc_done;
1367         spin_lock(&sfw_data.fw_lock);
1368
1369 #ifdef __KERNEL__
1370         if (!sfw_data.fw_shuttingdown)
1371                 sfw_add_session_timer();
1372 #else
1373         LASSERT(!sfw_data.fw_shuttingdown);
1374         sfw_add_session_timer();
1375 #endif
1376
1377         sfw_data.fw_active_srpc = NULL;
1378         spin_unlock(&sfw_data.fw_lock);
1379         return rc;
1380 }
1381
1382 int
1383 sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
1384 {
1385         struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1386         int                     rc;
1387
1388         LASSERT(rpc->srpc_bulk != NULL);
1389         LASSERT(sv->sv_id == SRPC_SERVICE_TEST);
1390         LASSERT(sfw_data.fw_active_srpc == NULL);
1391         LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1392
1393         spin_lock(&sfw_data.fw_lock);
1394
1395         if (status != 0) {
1396                 CERROR("Bulk transfer failed for RPC: "
1397                        "service %s, peer %s, status %d\n",
1398                        sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1399                 spin_unlock(&sfw_data.fw_lock);
1400                 return -EIO;
1401         }
1402
1403         if (sfw_data.fw_shuttingdown) {
1404                 spin_unlock(&sfw_data.fw_lock);
1405                 return -ESHUTDOWN;
1406         }
1407
1408         if (sfw_del_session_timer() != 0) {
1409                 CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1410                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1411                 spin_unlock(&sfw_data.fw_lock);
1412                 return -EAGAIN;
1413         }
1414
1415         sfw_data.fw_active_srpc = rpc;
1416         spin_unlock(&sfw_data.fw_lock);
1417
1418         rc = sfw_add_test(rpc);
1419
1420         spin_lock(&sfw_data.fw_lock);
1421
1422 #ifdef __KERNEL__
1423         if (!sfw_data.fw_shuttingdown)
1424                 sfw_add_session_timer();
1425 #else
1426         LASSERT(!sfw_data.fw_shuttingdown);
1427         sfw_add_session_timer();
1428 #endif
1429
1430         sfw_data.fw_active_srpc = NULL;
1431         spin_unlock(&sfw_data.fw_lock);
1432         return rc;
1433 }
1434
1435 srpc_client_rpc_t *
1436 sfw_create_rpc(lnet_process_id_t peer, int service,
1437                unsigned features, int nbulkiov, int bulklen,
1438                void (*done)(srpc_client_rpc_t *), void *priv)
1439 {
1440         srpc_client_rpc_t *rpc = NULL;
1441
1442         spin_lock(&sfw_data.fw_lock);
1443
1444         LASSERT (!sfw_data.fw_shuttingdown);
1445         LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1446
1447         if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1448                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1449                                      srpc_client_rpc_t, crpc_list);
1450                 list_del(&rpc->crpc_list);
1451
1452                 srpc_init_client_rpc(rpc, peer, service, 0, 0,
1453                                      done, sfw_client_rpc_fini, priv);
1454         }
1455
1456         spin_unlock(&sfw_data.fw_lock);
1457
1458         if (rpc == NULL) {
1459                 rpc = srpc_create_client_rpc(peer, service,
1460                                              nbulkiov, bulklen, done,
1461                                              nbulkiov != 0 ?  NULL :
1462                                              sfw_client_rpc_fini,
1463                                              priv);
1464         }
1465
1466         if (rpc != NULL) /* "session" is concept in framework */
1467                 rpc->crpc_reqstmsg.msg_ses_feats = features;
1468
1469         return rpc;
1470 }
1471
1472 void
1473 sfw_unpack_message (srpc_msg_t *msg)
1474 {
1475         if (msg->msg_magic == SRPC_MSG_MAGIC)
1476                 return; /* no flipping needed */
1477
1478         /* srpc module should guarantee I wouldn't get crap */
1479         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1480
1481         if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1482                 srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1483
1484                 __swab32s(&req->str_type);
1485                 __swab64s(&req->str_rpyid);
1486                 sfw_unpack_sid(req->str_sid);
1487                 return;
1488         }
1489
1490         if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1491                 srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1492
1493                 __swab32s(&rep->str_status);
1494                 sfw_unpack_sid(rep->str_sid);
1495                 sfw_unpack_fw_counters(rep->str_fw);
1496                 sfw_unpack_rpc_counters(rep->str_rpc);
1497                 sfw_unpack_lnet_counters(rep->str_lnet);
1498                 return;
1499         }
1500
1501         if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1502                 srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1503
1504                 __swab64s(&req->mksn_rpyid);
1505                 __swab32s(&req->mksn_force);
1506                 sfw_unpack_sid(req->mksn_sid);
1507                 return;
1508         }
1509
1510         if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1511                 srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1512
1513                 __swab32s(&rep->mksn_status);
1514                 __swab32s(&rep->mksn_timeout);
1515                 sfw_unpack_sid(rep->mksn_sid);
1516                 return;
1517         }
1518
1519         if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1520                 srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1521
1522                 __swab64s(&req->rmsn_rpyid);
1523                 sfw_unpack_sid(req->rmsn_sid);
1524                 return;
1525         }
1526
1527         if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1528                 srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1529
1530                 __swab32s(&rep->rmsn_status);
1531                 sfw_unpack_sid(rep->rmsn_sid);
1532                 return;
1533         }
1534
1535         if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1536                 srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1537
1538                 __swab64s(&req->dbg_rpyid);
1539                 __swab32s(&req->dbg_flags);
1540                 sfw_unpack_sid(req->dbg_sid);
1541                 return;
1542         }
1543
1544         if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1545                 srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1546
1547                 __swab32s(&rep->dbg_nbatch);
1548                 __swab32s(&rep->dbg_timeout);
1549                 sfw_unpack_sid(rep->dbg_sid);
1550                 return;
1551         }
1552
1553         if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1554                 srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1555
1556                 __swab32s(&req->bar_opc);
1557                 __swab64s(&req->bar_rpyid);
1558                 __swab32s(&req->bar_testidx);
1559                 __swab32s(&req->bar_arg);
1560                 sfw_unpack_sid(req->bar_sid);
1561                 __swab64s(&req->bar_bid.bat_id);
1562                 return;
1563         }
1564
1565         if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1566                 srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1567
1568                 __swab32s(&rep->bar_status);
1569                 sfw_unpack_sid(rep->bar_sid);
1570                 return;
1571         }
1572
1573         if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1574                 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1575
1576                 __swab64s(&req->tsr_rpyid);
1577                 __swab64s(&req->tsr_bulkid);
1578                 __swab32s(&req->tsr_loop);
1579                 __swab32s(&req->tsr_ndest);
1580                 __swab32s(&req->tsr_concur);
1581                 __swab32s(&req->tsr_service);
1582                 sfw_unpack_sid(req->tsr_sid);
1583                 __swab64s(&req->tsr_bid.bat_id);
1584                 return;
1585         }
1586
1587         if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1588                 srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1589
1590                 __swab32s(&rep->tsr_status);
1591                 sfw_unpack_sid(rep->tsr_sid);
1592                 return;
1593         }
1594
1595         if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1596                 srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1597
1598                 __swab64s(&req->join_rpyid);
1599                 sfw_unpack_sid(req->join_sid);
1600                 return;
1601         }
1602
1603         if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1604                 srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1605
1606                 __swab32s(&rep->join_status);
1607                 __swab32s(&rep->join_timeout);
1608                 sfw_unpack_sid(rep->join_sid);
1609                 return;
1610         }
1611
1612         LBUG ();
1613         return;
1614 }
1615
1616 void
1617 sfw_abort_rpc (srpc_client_rpc_t *rpc)
1618 {
1619         LASSERT(atomic_read(&rpc->crpc_refcount) > 0);
1620         LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1621
1622         spin_lock(&rpc->crpc_lock);
1623         srpc_abort_rpc(rpc, -EINTR);
1624         spin_unlock(&rpc->crpc_lock);
1625         return;
1626 }
1627
1628 void
1629 sfw_post_rpc (srpc_client_rpc_t *rpc)
1630 {
1631         spin_lock(&rpc->crpc_lock);
1632
1633         LASSERT(!rpc->crpc_closed);
1634         LASSERT(!rpc->crpc_aborted);
1635         LASSERT(list_empty(&rpc->crpc_list));
1636         LASSERT(!sfw_data.fw_shuttingdown);
1637
1638         rpc->crpc_timeout = rpc_timeout;
1639         srpc_post_rpc(rpc);
1640
1641         spin_unlock(&rpc->crpc_lock);
1642         return;
1643 }
1644
1645 static srpc_service_t sfw_services[] = 
1646 {
1647         {
1648                 /* sv_id */    SRPC_SERVICE_DEBUG,
1649                 /* sv_name */  "debug",
1650                 0
1651         },
1652         {
1653                 /* sv_id */    SRPC_SERVICE_QUERY_STAT,
1654                 /* sv_name */  "query stats",
1655                 0
1656         },
1657         {
1658                 /* sv_id */    SRPC_SERVICE_MAKE_SESSION,
1659                 /* sv_name */  "make session",
1660                 0
1661         },
1662         {
1663                 /* sv_id */    SRPC_SERVICE_REMOVE_SESSION,
1664                 /* sv_name */  "remove session",
1665                 0
1666         },
1667         {
1668                 /* sv_id */    SRPC_SERVICE_BATCH,
1669                 /* sv_name */  "batch service",
1670                 0
1671         },
1672         {
1673                 /* sv_id */    SRPC_SERVICE_TEST,
1674                 /* sv_name */  "test service",
1675                 0
1676         },
1677         {
1678                 /* sv_id */    0,
1679                 /* sv_name */  NULL,
1680                 0
1681         }
1682 };
1683
1684 extern sfw_test_client_ops_t ping_test_client;
1685 extern srpc_service_t        ping_test_service;
1686 extern void ping_init_test_client(void);
1687 extern void ping_init_test_service(void);
1688
1689 extern sfw_test_client_ops_t brw_test_client;
1690 extern srpc_service_t        brw_test_service;
1691 extern void brw_init_test_client(void);
1692 extern void brw_init_test_service(void);
1693
1694
1695 int
1696 sfw_startup (void)
1697 {
1698         int              i;
1699         int              rc;
1700         int              error;
1701         srpc_service_t  *sv;
1702         sfw_test_case_t *tsc;
1703
1704 #ifndef __KERNEL__
1705         char *s;
1706
1707         s = getenv("SESSION_TIMEOUT");
1708         session_timeout = s != NULL ? atoi(s) : session_timeout;
1709
1710         s = getenv("RPC_TIMEOUT");
1711         rpc_timeout = s != NULL ? atoi(s) : rpc_timeout;
1712 #endif
1713
1714         if (session_timeout < 0) {
1715                 CERROR ("Session timeout must be non-negative: %d\n",
1716                         session_timeout);
1717                 return -EINVAL;
1718         }
1719
1720         if (rpc_timeout < 0) {
1721                 CERROR ("RPC timeout must be non-negative: %d\n",
1722                         rpc_timeout);
1723                 return -EINVAL;
1724         }
1725
1726         if (session_timeout == 0)
1727                 CWARN ("Zero session_timeout specified "
1728                        "- test sessions never expire.\n");
1729
1730         if (rpc_timeout == 0)
1731                 CWARN ("Zero rpc_timeout specified "
1732                        "- test RPC never expire.\n");
1733
1734         memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1735
1736         sfw_data.fw_session     = NULL;
1737         sfw_data.fw_active_srpc = NULL;
1738         spin_lock_init(&sfw_data.fw_lock);
1739         atomic_set(&sfw_data.fw_nzombies, 0);
1740         INIT_LIST_HEAD(&sfw_data.fw_tests);
1741         INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1742         INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1743
1744         brw_init_test_client();
1745         brw_init_test_service();
1746         rc = sfw_register_test(&brw_test_service, &brw_test_client);
1747         LASSERT (rc == 0);
1748
1749         ping_init_test_client();
1750         ping_init_test_service();
1751         rc = sfw_register_test(&ping_test_service, &ping_test_client);
1752         LASSERT (rc == 0);
1753
1754         error = 0;
1755         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1756                 sv = tsc->tsc_srv_service;
1757
1758                 rc = srpc_add_service(sv);
1759                 LASSERT(rc != -EBUSY);
1760                 if (rc != 0) {
1761                         CWARN("Failed to add %s service: %d\n",
1762                               sv->sv_name, rc);
1763                         error = rc;
1764                 }
1765         }
1766
1767         for (i = 0; ; i++) {
1768                 sv = &sfw_services[i];
1769                 if (sv->sv_name == NULL) break;
1770
1771                 sv->sv_bulk_ready = NULL;
1772                 sv->sv_handler    = sfw_handle_server_rpc;
1773                 sv->sv_wi_total   = SFW_FRWK_WI_MAX;
1774                 if (sv->sv_id == SRPC_SERVICE_TEST)
1775                         sv->sv_bulk_ready = sfw_bulk_ready;
1776
1777                 rc = srpc_add_service(sv);
1778                 LASSERT (rc != -EBUSY);
1779                 if (rc != 0) {
1780                         CWARN ("Failed to add %s service: %d\n",
1781                                sv->sv_name, rc);
1782                         error = rc;
1783                 }
1784
1785                 /* about to sfw_shutdown, no need to add buffer */
1786                 if (error) continue;
1787
1788                 rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
1789                 if (rc != 0) {
1790                         CWARN("Failed to reserve enough buffers: "
1791                               "service %s, %d needed: %d\n",
1792                               sv->sv_name, sv->sv_wi_total, rc);
1793                         error = -ENOMEM;
1794                 }
1795         }
1796
1797         if (error != 0)
1798                 sfw_shutdown();
1799         return error;
1800 }
1801
1802 void
1803 sfw_shutdown (void)
1804 {
1805         srpc_service_t  *sv;
1806         sfw_test_case_t *tsc;
1807         int              i;
1808
1809         spin_lock(&sfw_data.fw_lock);
1810
1811         sfw_data.fw_shuttingdown = 1;
1812 #ifdef __KERNEL__
1813         lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1814                        "waiting for active RPC to finish.\n");
1815 #else
1816         LASSERT (sfw_data.fw_active_srpc == NULL);
1817 #endif
1818
1819         if (sfw_del_session_timer() != 0)
1820                 lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1821                                "waiting for session timer to explode.\n");
1822
1823         sfw_deactivate_session();
1824         lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0,
1825                        sfw_data.fw_lock,
1826                        "waiting for %d zombie sessions to die.\n",
1827                        atomic_read(&sfw_data.fw_nzombies));
1828
1829         spin_unlock(&sfw_data.fw_lock);
1830
1831         for (i = 0; ; i++) {
1832                 sv = &sfw_services[i];
1833                 if (sv->sv_name == NULL)
1834                         break;
1835
1836                 srpc_shutdown_service(sv);
1837                 srpc_remove_service(sv);
1838         }
1839
1840         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1841                 sv = tsc->tsc_srv_service;
1842                 srpc_shutdown_service(sv);
1843                 srpc_remove_service(sv);
1844         }
1845
1846         while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1847                 srpc_client_rpc_t *rpc;
1848
1849                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1850                                  srpc_client_rpc_t, crpc_list);
1851                 list_del(&rpc->crpc_list);
1852
1853                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1854         }
1855
1856         for (i = 0; ; i++) {
1857                 sv = &sfw_services[i];
1858                 if (sv->sv_name == NULL)
1859                         break;
1860
1861                 srpc_wait_service_shutdown(sv);
1862         }
1863
1864         while (!list_empty(&sfw_data.fw_tests)) {
1865                 tsc = list_entry(sfw_data.fw_tests.next,
1866                                  sfw_test_case_t, tsc_list);
1867
1868                 srpc_wait_service_shutdown(tsc->tsc_srv_service);
1869
1870                 list_del(&tsc->tsc_list);
1871                 LIBCFS_FREE(tsc, sizeof(*tsc));
1872         }
1873
1874         return;
1875 }