Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lnet / selftest / framework.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/selftest/framework.c
35  *
36  * Author: Isaac Huang <isaac@clusterfs.com>
37  * Author: Liang Zhen  <liangzhen@clusterfs.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_LNET
41
42 #include "selftest.h"
43
44 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
45
46 static int session_timeout = 100;
47 CFS_MODULE_PARM(session_timeout, "i", int, 0444,
48                 "test session timeout in seconds (100 by default, 0 == never)");
49
50 static int rpc_timeout = 64;
51 CFS_MODULE_PARM(rpc_timeout, "i", int, 0644,
52                 "rpc timeout in seconds (64 by default, 0 == never)");
53
54 #define sfw_unpack_id(id)               \
55 do {                                    \
56         __swab64s(&(id).nid);           \
57         __swab32s(&(id).pid);           \
58 } while (0)
59
60 #define sfw_unpack_sid(sid)             \
61 do {                                    \
62         __swab64s(&(sid).ses_nid);      \
63         __swab64s(&(sid).ses_stamp);    \
64 } while (0)
65
66 #define sfw_unpack_fw_counters(fc)        \
67 do {                                      \
68         __swab32s(&(fc).running_ms);      \
69         __swab32s(&(fc).active_batches);  \
70         __swab32s(&(fc).zombie_sessions); \
71         __swab32s(&(fc).brw_errors);      \
72         __swab32s(&(fc).ping_errors);     \
73 } while (0)
74
75 #define sfw_unpack_rpc_counters(rc)     \
76 do {                                    \
77         __swab32s(&(rc).errors);        \
78         __swab32s(&(rc).rpcs_sent);     \
79         __swab32s(&(rc).rpcs_rcvd);     \
80         __swab32s(&(rc).rpcs_dropped);  \
81         __swab32s(&(rc).rpcs_expired);  \
82         __swab64s(&(rc).bulk_get);      \
83         __swab64s(&(rc).bulk_put);      \
84 } while (0)
85
86 #define sfw_unpack_lnet_counters(lc)    \
87 do {                                    \
88         __swab32s(&(lc).errors);        \
89         __swab32s(&(lc).msgs_max);      \
90         __swab32s(&(lc).msgs_alloc);    \
91         __swab32s(&(lc).send_count);    \
92         __swab32s(&(lc).recv_count);    \
93         __swab32s(&(lc).drop_count);    \
94         __swab32s(&(lc).route_count);   \
95         __swab64s(&(lc).send_length);   \
96         __swab64s(&(lc).recv_length);   \
97         __swab64s(&(lc).drop_length);   \
98         __swab64s(&(lc).route_length);  \
99 } while (0)
100
101 #define sfw_test_active(t)      (cfs_atomic_read(&(t)->tsi_nactive) != 0)
102 #define sfw_batch_active(b)     (cfs_atomic_read(&(b)->bat_nactive) != 0)
103
104 struct smoketest_framework {
105         cfs_list_t         fw_zombie_rpcs;     /* RPCs to be recycled */
106         cfs_list_t         fw_zombie_sessions; /* stopping sessions */
107         cfs_list_t         fw_tests;           /* registered test cases */
108         cfs_atomic_t       fw_nzombies;        /* # zombie sessions */
109         spinlock_t         fw_lock;             /* serialise */
110         sfw_session_t     *fw_session;          /* _the_ session */
111         int                fw_shuttingdown;     /* shutdown in progress */
112         srpc_server_rpc_t *fw_active_srpc;      /* running RPC */
113 } sfw_data;
114
115 /* forward ref's */
116 int sfw_stop_batch (sfw_batch_t *tsb, int force);
117 void sfw_destroy_session (sfw_session_t *sn);
118
119 static inline sfw_test_case_t *
120 sfw_find_test_case(int id)
121 {
122         sfw_test_case_t *tsc;
123
124         LASSERT (id <= SRPC_SERVICE_MAX_ID);
125         LASSERT (id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
126
127         cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests,
128                                        sfw_test_case_t, tsc_list) {
129                 if (tsc->tsc_srv_service->sv_id == id)
130                         return tsc;
131         }
132
133         return NULL;
134 }
135
136 static int
137 sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops)
138 {
139         sfw_test_case_t *tsc;
140
141         if (sfw_find_test_case(service->sv_id) != NULL) {
142                 CERROR ("Failed to register test %s (%d)\n",
143                         service->sv_name, service->sv_id);
144                 return -EEXIST;
145         }
146
147         LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t));
148         if (tsc == NULL)
149                 return -ENOMEM;
150
151         memset(tsc, 0, sizeof(sfw_test_case_t));
152         tsc->tsc_cli_ops     = cliops;
153         tsc->tsc_srv_service = service;
154
155         cfs_list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
156         return 0;
157 }
158
159 void
160 sfw_add_session_timer (void)
161 {
162         sfw_session_t *sn = sfw_data.fw_session;
163         stt_timer_t   *timer = &sn->sn_timer;
164
165         LASSERT (!sfw_data.fw_shuttingdown);
166
167         if (sn == NULL || sn->sn_timeout == 0)
168                 return;
169
170         LASSERT (!sn->sn_timer_active);
171
172         sn->sn_timer_active = 1;
173         timer->stt_expires = cfs_time_add(sn->sn_timeout,
174                                           cfs_time_current_sec());
175         stt_add_timer(timer);
176         return;
177 }
178
179 int
180 sfw_del_session_timer (void)
181 {
182         sfw_session_t *sn = sfw_data.fw_session;
183
184         if (sn == NULL || !sn->sn_timer_active)
185                 return 0;
186
187         LASSERT (sn->sn_timeout != 0);
188
189         if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
190                 sn->sn_timer_active = 0;
191                 return 0;
192         }
193
194 #ifndef __KERNEL__
195         /* Racing is impossible in single-threaded userland selftest */
196         LBUG();
197 #endif
198         return EBUSY; /* racing with sfw_session_expired() */
199 }
200
201 /* called with sfw_data.fw_lock held */
202 static void
203 sfw_deactivate_session (void)
204 {
205         sfw_session_t *sn = sfw_data.fw_session;
206         int            nactive = 0;
207         sfw_batch_t   *tsb;
208         sfw_test_case_t *tsc;
209
210         if (sn == NULL) return;
211
212         LASSERT (!sn->sn_timer_active);
213
214         sfw_data.fw_session = NULL;
215         cfs_atomic_inc(&sfw_data.fw_nzombies);
216         cfs_list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
217
218         spin_unlock(&sfw_data.fw_lock);
219
220         cfs_list_for_each_entry_typed(tsc, &sfw_data.fw_tests,
221                                       sfw_test_case_t, tsc_list) {
222                 srpc_abort_service(tsc->tsc_srv_service);
223         }
224
225         spin_lock(&sfw_data.fw_lock);
226
227         cfs_list_for_each_entry_typed (tsb, &sn->sn_batches,
228                                        sfw_batch_t, bat_list) {
229                 if (sfw_batch_active(tsb)) {
230                         nactive++;
231                         sfw_stop_batch(tsb, 1);
232                 }
233         }
234
235         if (nactive != 0)
236                 return;   /* wait for active batches to stop */
237
238         cfs_list_del_init(&sn->sn_list);
239         spin_unlock(&sfw_data.fw_lock);
240
241         sfw_destroy_session(sn);
242
243         spin_lock(&sfw_data.fw_lock);
244 }
245
246 #ifndef __KERNEL__
247
248 int
249 sfw_session_removed (void)
250 {
251         return (sfw_data.fw_session == NULL) ? 1 : 0;
252 }
253
254 #endif
255
256 void
257 sfw_session_expired (void *data)
258 {
259         sfw_session_t *sn = data;
260
261         spin_lock(&sfw_data.fw_lock);
262
263         LASSERT (sn->sn_timer_active);
264         LASSERT (sn == sfw_data.fw_session);
265
266         CWARN ("Session expired! sid: %s-"LPU64", name: %s\n",
267                libcfs_nid2str(sn->sn_id.ses_nid),
268                sn->sn_id.ses_stamp, &sn->sn_name[0]);
269
270         sn->sn_timer_active = 0;
271         sfw_deactivate_session();
272
273         spin_unlock(&sfw_data.fw_lock);
274 }
275
276 static inline void
277 sfw_init_session(sfw_session_t *sn, lst_sid_t sid,
278                  unsigned features, const char *name)
279 {
280         stt_timer_t *timer = &sn->sn_timer;
281
282         memset(sn, 0, sizeof(sfw_session_t));
283         CFS_INIT_LIST_HEAD(&sn->sn_list);
284         CFS_INIT_LIST_HEAD(&sn->sn_batches);
285         cfs_atomic_set(&sn->sn_refcount, 1);        /* +1 for caller */
286         cfs_atomic_set(&sn->sn_brw_errors, 0);
287         cfs_atomic_set(&sn->sn_ping_errors, 0);
288         strncpy(&sn->sn_name[0], name, LST_NAME_SIZE);
289
290         sn->sn_timer_active = 0;
291         sn->sn_id           = sid;
292         sn->sn_features     = features;
293         sn->sn_timeout      = session_timeout;
294         sn->sn_started      = cfs_time_current();
295
296         timer->stt_data = sn;
297         timer->stt_func = sfw_session_expired;
298         CFS_INIT_LIST_HEAD(&timer->stt_list);
299 }
300
301 /* completion handler for incoming framework RPCs */
302 void
303 sfw_server_rpc_done(struct srpc_server_rpc *rpc)
304 {
305         struct srpc_service     *sv     = rpc->srpc_scd->scd_svc;
306         int                     status  = rpc->srpc_status;
307
308         CDEBUG (D_NET,
309                 "Incoming framework RPC done: "
310                 "service %s, peer %s, status %s:%d\n",
311                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
312                 swi_state2str(rpc->srpc_wi.swi_state),
313                 status);
314
315         if (rpc->srpc_bulk != NULL)
316                 sfw_free_pages(rpc);
317         return;
318 }
319
320 void
321 sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
322 {
323         LASSERT (rpc->crpc_bulk.bk_niov == 0);
324         LASSERT (cfs_list_empty(&rpc->crpc_list));
325         LASSERT (cfs_atomic_read(&rpc->crpc_refcount) == 0);
326 #ifndef __KERNEL__
327         LASSERT (rpc->crpc_bulk.bk_pages == NULL);
328 #endif
329
330         CDEBUG (D_NET,
331                 "Outgoing framework RPC done: "
332                 "service %d, peer %s, status %s:%d:%d\n",
333                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
334                 swi_state2str(rpc->crpc_wi.swi_state),
335                 rpc->crpc_aborted, rpc->crpc_status);
336
337         spin_lock(&sfw_data.fw_lock);
338
339         /* my callers must finish all RPCs before shutting me down */
340         LASSERT(!sfw_data.fw_shuttingdown);
341         cfs_list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
342
343         spin_unlock(&sfw_data.fw_lock);
344 }
345
346 sfw_batch_t *
347 sfw_find_batch (lst_bid_t bid)
348 {
349         sfw_session_t *sn = sfw_data.fw_session;
350         sfw_batch_t   *bat;
351
352         LASSERT (sn != NULL);
353
354         cfs_list_for_each_entry_typed (bat, &sn->sn_batches,
355                                        sfw_batch_t, bat_list) {
356                 if (bat->bat_id.bat_id == bid.bat_id)
357                         return bat;
358         }
359
360         return NULL;
361 }
362
363 sfw_batch_t *
364 sfw_bid2batch (lst_bid_t bid)
365 {
366         sfw_session_t *sn = sfw_data.fw_session;
367         sfw_batch_t   *bat;
368
369         LASSERT (sn != NULL);
370
371         bat = sfw_find_batch(bid);
372         if (bat != NULL)
373                 return bat;
374
375         LIBCFS_ALLOC(bat, sizeof(sfw_batch_t));
376         if (bat == NULL) 
377                 return NULL;
378
379         bat->bat_error    = 0;
380         bat->bat_session  = sn;
381         bat->bat_id       = bid;
382         cfs_atomic_set(&bat->bat_nactive, 0);
383         CFS_INIT_LIST_HEAD(&bat->bat_tests);
384
385         cfs_list_add_tail(&bat->bat_list, &sn->sn_batches);
386         return bat;
387 }
388
389 int
390 sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply)
391 {
392         sfw_session_t  *sn = sfw_data.fw_session;
393         sfw_counters_t *cnt = &reply->str_fw;
394         sfw_batch_t    *bat;
395         struct timeval  tv;
396
397         reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
398
399         if (request->str_sid.ses_nid == LNET_NID_ANY) {
400                 reply->str_status = EINVAL;
401                 return 0;
402         }
403
404         if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
405                 reply->str_status = ESRCH;
406                 return 0;
407         }
408
409         lnet_counters_get(&reply->str_lnet);
410         srpc_get_counters(&reply->str_rpc);
411
412         /* send over the msecs since the session was started
413          - with 32 bits to send, this is ~49 days */
414         cfs_duration_usec(cfs_time_sub(cfs_time_current(),
415                                        sn->sn_started), &tv);
416
417         cnt->running_ms      = (__u32)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
418         cnt->brw_errors      = cfs_atomic_read(&sn->sn_brw_errors);
419         cnt->ping_errors     = cfs_atomic_read(&sn->sn_ping_errors);
420         cnt->zombie_sessions = cfs_atomic_read(&sfw_data.fw_nzombies);
421
422         cnt->active_batches = 0;
423         cfs_list_for_each_entry_typed (bat, &sn->sn_batches,
424                                        sfw_batch_t, bat_list) {
425                 if (cfs_atomic_read(&bat->bat_nactive) > 0)
426                         cnt->active_batches++;
427         }
428
429         reply->str_status = 0;
430         return 0;
431 }
432
433 int
434 sfw_make_session(srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply)
435 {
436         sfw_session_t *sn = sfw_data.fw_session;
437         srpc_msg_t    *msg = container_of(request, srpc_msg_t,
438                                           msg_body.mksn_reqst);
439
440         if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
441                 reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
442                 reply->mksn_status = EINVAL;
443                 return 0;
444         }
445
446         if (sn != NULL) {
447                 reply->mksn_status  = 0;
448                 reply->mksn_sid     = sn->sn_id;
449                 reply->mksn_timeout = sn->sn_timeout;
450
451                 if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) {
452                         cfs_atomic_inc(&sn->sn_refcount);
453                         return 0;
454                 }
455
456                 if (!request->mksn_force) {
457                         reply->mksn_status = EBUSY;
458                         strncpy(&reply->mksn_name[0], &sn->sn_name[0], LST_NAME_SIZE);
459                         return 0;
460                 }
461         }
462
463         /* reject the request if it requires unknown features
464          * NB: old version will always accept all features because it's not
465          * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also
466          * harmless because it will return zero feature to console, and it's
467          * console's responsibility to make sure all nodes in a session have
468          * same feature mask. */
469         if ((msg->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
470                 reply->mksn_status = EPROTO;
471                 return 0;
472         }
473
474         /* brand new or create by force */
475         LIBCFS_ALLOC(sn, sizeof(sfw_session_t));
476         if (sn == NULL) {
477                 CERROR ("Dropping RPC (mksn) under memory pressure.\n");
478                 return -ENOMEM;
479         }
480
481         sfw_init_session(sn, request->mksn_sid,
482                          msg->msg_ses_feats, &request->mksn_name[0]);
483
484         spin_lock(&sfw_data.fw_lock);
485
486         sfw_deactivate_session();
487         LASSERT(sfw_data.fw_session == NULL);
488         sfw_data.fw_session = sn;
489
490         spin_unlock(&sfw_data.fw_lock);
491
492         reply->mksn_status  = 0;
493         reply->mksn_sid     = sn->sn_id;
494         reply->mksn_timeout = sn->sn_timeout;
495         return 0;
496 }
497
498 int
499 sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply)
500 {
501         sfw_session_t *sn = sfw_data.fw_session;
502
503         reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
504
505         if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
506                 reply->rmsn_status = EINVAL;
507                 return 0;
508         }
509
510         if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
511                 reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY;
512                 return 0;
513         }
514
515         if (!cfs_atomic_dec_and_test(&sn->sn_refcount)) {
516                 reply->rmsn_status = 0;
517                 return 0;
518         }
519
520         spin_lock(&sfw_data.fw_lock);
521         sfw_deactivate_session();
522         spin_unlock(&sfw_data.fw_lock);
523
524         reply->rmsn_status = 0;
525         reply->rmsn_sid    = LST_INVALID_SID;
526         LASSERT(sfw_data.fw_session == NULL);
527         return 0;
528 }
529
530 int
531 sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply)
532 {
533         sfw_session_t *sn = sfw_data.fw_session;
534
535         if (sn == NULL) {
536                 reply->dbg_status = ESRCH;
537                 reply->dbg_sid    = LST_INVALID_SID;
538                 return 0;
539         } 
540
541         reply->dbg_status  = 0;
542         reply->dbg_sid     = sn->sn_id;      
543         reply->dbg_timeout = sn->sn_timeout;
544         strncpy(reply->dbg_name, &sn->sn_name[0], LST_NAME_SIZE);
545
546         return 0;
547 }
548
549 void
550 sfw_test_rpc_fini (srpc_client_rpc_t *rpc)
551 {
552         sfw_test_unit_t     *tsu = rpc->crpc_priv;
553         sfw_test_instance_t *tsi = tsu->tsu_instance;
554
555         /* Called with hold of tsi->tsi_lock */
556         LASSERT (cfs_list_empty(&rpc->crpc_list));
557         cfs_list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
558 }
559
560 static inline int
561 sfw_test_buffers(sfw_test_instance_t *tsi)
562 {
563         struct sfw_test_case    *tsc = sfw_find_test_case(tsi->tsi_service);
564         struct srpc_service     *svc = tsc->tsc_srv_service;
565         int                     nbuf;
566
567         nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
568         return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
569 }
570
571 int
572 sfw_load_test(struct sfw_test_instance *tsi)
573 {
574         struct sfw_test_case    *tsc = sfw_find_test_case(tsi->tsi_service);
575         struct srpc_service     *svc = tsc->tsc_srv_service;
576         int                     nbuf = sfw_test_buffers(tsi);
577         int                     rc;
578
579         LASSERT(tsc != NULL);
580
581         if (tsi->tsi_is_client) {
582                 tsi->tsi_ops = tsc->tsc_cli_ops;
583                 return 0;
584         }
585
586         rc = srpc_service_add_buffers(svc, nbuf);
587         if (rc != 0) {
588                 CWARN("Failed to reserve enough buffers: "
589                       "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc);
590                 /* NB: this error handler is not strictly correct, because
591                  * it may release more buffers than already allocated,
592                  * but it doesn't matter because request portal should
593                  * be lazy portal and will grow buffers if necessary. */
594                 srpc_service_remove_buffers(svc, nbuf);
595                 return -ENOMEM;
596         }
597
598         CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
599                nbuf * (srpc_serv_is_framework(svc) ?
600                        1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
601         return 0;
602 }
603
604 void
605 sfw_unload_test(struct sfw_test_instance *tsi)
606 {
607         struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service);
608
609         LASSERT(tsc != NULL);
610
611         if (tsi->tsi_is_client)
612                 return;
613
614         /* shrink buffers, because request portal is lazy portal
615          * which can grow buffers at runtime so we may leave
616          * some buffers behind, but never mind... */
617         srpc_service_remove_buffers(tsc->tsc_srv_service,
618                                     sfw_test_buffers(tsi));
619         return;
620 }
621
622 void
623 sfw_destroy_test_instance (sfw_test_instance_t *tsi)
624 {
625         srpc_client_rpc_t *rpc;
626         sfw_test_unit_t   *tsu;
627
628         if (!tsi->tsi_is_client) goto clean;
629
630         tsi->tsi_ops->tso_fini(tsi);
631
632         LASSERT (!tsi->tsi_stopping);
633         LASSERT (cfs_list_empty(&tsi->tsi_active_rpcs));
634         LASSERT (!sfw_test_active(tsi));
635
636         while (!cfs_list_empty(&tsi->tsi_units)) {
637                 tsu = cfs_list_entry(tsi->tsi_units.next,
638                                      sfw_test_unit_t, tsu_list);
639                 cfs_list_del(&tsu->tsu_list);
640                 LIBCFS_FREE(tsu, sizeof(*tsu));
641         }
642
643         while (!cfs_list_empty(&tsi->tsi_free_rpcs)) {
644                 rpc = cfs_list_entry(tsi->tsi_free_rpcs.next,
645                                      srpc_client_rpc_t, crpc_list);
646                 cfs_list_del(&rpc->crpc_list);
647                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
648         }
649
650 clean:
651         sfw_unload_test(tsi);
652         LIBCFS_FREE(tsi, sizeof(*tsi));
653         return;
654 }
655
656 void
657 sfw_destroy_batch (sfw_batch_t *tsb)
658 {
659         sfw_test_instance_t *tsi;
660
661         LASSERT (!sfw_batch_active(tsb));
662         LASSERT (cfs_list_empty(&tsb->bat_list));
663
664         while (!cfs_list_empty(&tsb->bat_tests)) {
665                 tsi = cfs_list_entry(tsb->bat_tests.next,
666                                      sfw_test_instance_t, tsi_list);
667                 cfs_list_del_init(&tsi->tsi_list);
668                 sfw_destroy_test_instance(tsi);
669         }
670
671         LIBCFS_FREE(tsb, sizeof(sfw_batch_t));
672         return;
673 }
674
675 void
676 sfw_destroy_session (sfw_session_t *sn)
677 {
678         sfw_batch_t *batch;
679
680         LASSERT (cfs_list_empty(&sn->sn_list));
681         LASSERT (sn != sfw_data.fw_session);
682
683         while (!cfs_list_empty(&sn->sn_batches)) {
684                 batch = cfs_list_entry(sn->sn_batches.next,
685                                        sfw_batch_t, bat_list);
686                 cfs_list_del_init(&batch->bat_list);
687                 sfw_destroy_batch(batch);
688         }
689
690         LIBCFS_FREE(sn, sizeof(*sn));
691         cfs_atomic_dec(&sfw_data.fw_nzombies);
692         return;
693 }
694
695 void
696 sfw_unpack_addtest_req(srpc_msg_t *msg)
697 {
698         srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
699
700         LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST);
701         LASSERT (req->tsr_is_client);
702
703         if (msg->msg_magic == SRPC_MSG_MAGIC)
704                 return; /* no flipping needed */
705
706         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
707
708         if (req->tsr_service == SRPC_SERVICE_BRW) {
709                 if ((msg->msg_ses_feats & LST_FEAT_BULK_LEN) == 0) {
710                         test_bulk_req_t *bulk = &req->tsr_u.bulk_v0;
711
712                         __swab32s(&bulk->blk_opc);
713                         __swab32s(&bulk->blk_npg);
714                         __swab32s(&bulk->blk_flags);
715
716                 } else {
717                         test_bulk_req_v1_t *bulk = &req->tsr_u.bulk_v1;
718
719                         __swab16s(&bulk->blk_opc);
720                         __swab16s(&bulk->blk_flags);
721                         __swab32s(&bulk->blk_offset);
722                         __swab32s(&bulk->blk_len);
723                 }
724
725                 return;
726         }
727
728         if (req->tsr_service == SRPC_SERVICE_PING) {
729                 test_ping_req_t *ping = &req->tsr_u.ping;
730
731                 __swab32s(&ping->png_size);
732                 __swab32s(&ping->png_flags);
733                 return;
734         }
735
736         LBUG ();
737         return;
738 }
739
740 int
741 sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc)
742 {
743         srpc_msg_t          *msg = &rpc->srpc_reqstbuf->buf_msg;
744         srpc_test_reqst_t   *req = &msg->msg_body.tes_reqst;
745         srpc_bulk_t         *bk = rpc->srpc_bulk;
746         int                  ndest = req->tsr_ndest;
747         sfw_test_unit_t     *tsu;
748         sfw_test_instance_t *tsi;
749         int                  i;
750         int                  rc;
751
752         LIBCFS_ALLOC(tsi, sizeof(*tsi));
753         if (tsi == NULL) {
754                 CERROR ("Can't allocate test instance for batch: "LPU64"\n",
755                         tsb->bat_id.bat_id);
756                 return -ENOMEM;
757         }
758
759         memset(tsi, 0, sizeof(*tsi));
760         spin_lock_init(&tsi->tsi_lock);
761         cfs_atomic_set(&tsi->tsi_nactive, 0);
762         CFS_INIT_LIST_HEAD(&tsi->tsi_units);
763         CFS_INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
764         CFS_INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
765
766         tsi->tsi_stopping      = 0;
767         tsi->tsi_batch         = tsb;
768         tsi->tsi_loop          = req->tsr_loop;
769         tsi->tsi_concur        = req->tsr_concur;
770         tsi->tsi_service       = req->tsr_service;
771         tsi->tsi_is_client     = !!(req->tsr_is_client);
772         tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
773
774         rc = sfw_load_test(tsi);
775         if (rc != 0) {
776                 LIBCFS_FREE(tsi, sizeof(*tsi));
777                 return rc;
778         }
779
780         LASSERT (!sfw_batch_active(tsb));
781
782         if (!tsi->tsi_is_client) {
783                 /* it's test server, just add it to tsb */
784                 cfs_list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
785                 return 0;
786         }
787
788         LASSERT (bk != NULL);
789 #ifndef __KERNEL__
790         LASSERT (bk->bk_pages != NULL);
791 #endif
792         LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest);
793         LASSERT((unsigned int)bk->bk_len >=
794                 sizeof(lnet_process_id_packed_t) * ndest);
795
796         sfw_unpack_addtest_req(msg);
797         memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
798
799         for (i = 0; i < ndest; i++) {
800                 lnet_process_id_packed_t *dests;
801                 lnet_process_id_packed_t  id;
802                 int                       j;
803
804 #ifdef __KERNEL__
805                 dests = cfs_page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
806                 LASSERT (dests != NULL);  /* my pages are within KVM always */
807 #else
808                 dests = cfs_page_address(bk->bk_pages[i / SFW_ID_PER_PAGE]);
809 #endif
810                 id = dests[i % SFW_ID_PER_PAGE];
811                 if (msg->msg_magic != SRPC_MSG_MAGIC)
812                         sfw_unpack_id(id);
813
814                 for (j = 0; j < tsi->tsi_concur; j++) {
815                         LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t));
816                         if (tsu == NULL) {
817                                 rc = -ENOMEM;
818                                 CERROR ("Can't allocate tsu for %d\n",
819                                         tsi->tsi_service);
820                                 goto error;
821                         }
822
823                         tsu->tsu_dest.nid = id.nid;
824                         tsu->tsu_dest.pid = id.pid;
825                         tsu->tsu_instance = tsi;
826                         tsu->tsu_private  = NULL;
827                         cfs_list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
828                 }
829         }
830
831         rc = tsi->tsi_ops->tso_init(tsi);
832         if (rc == 0) {
833                 cfs_list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
834                 return 0;
835         }
836
837 error:
838         LASSERT (rc != 0);
839         sfw_destroy_test_instance(tsi);
840         return rc;
841 }
842
843 static void
844 sfw_test_unit_done (sfw_test_unit_t *tsu)
845 {
846         sfw_test_instance_t *tsi = tsu->tsu_instance;
847         sfw_batch_t         *tsb = tsi->tsi_batch;
848         sfw_session_t       *sn = tsb->bat_session;
849
850         LASSERT (sfw_test_active(tsi));
851
852         if (!cfs_atomic_dec_and_test(&tsi->tsi_nactive))
853                 return;
854
855         /* the test instance is done */
856         spin_lock(&tsi->tsi_lock);
857
858         tsi->tsi_stopping = 0;
859
860         spin_unlock(&tsi->tsi_lock);
861
862         spin_lock(&sfw_data.fw_lock);
863
864         if (!cfs_atomic_dec_and_test(&tsb->bat_nactive) ||/* tsb still active */
865             sn == sfw_data.fw_session) {                  /* sn also active */
866                 spin_unlock(&sfw_data.fw_lock);
867                 return;
868         }
869
870         LASSERT (!cfs_list_empty(&sn->sn_list)); /* I'm a zombie! */
871
872         cfs_list_for_each_entry_typed (tsb, &sn->sn_batches,
873                                        sfw_batch_t, bat_list) {
874                 if (sfw_batch_active(tsb)) {
875                         spin_unlock(&sfw_data.fw_lock);
876                         return;
877                 }
878         }
879
880         cfs_list_del_init(&sn->sn_list);
881         spin_unlock(&sfw_data.fw_lock);
882
883         sfw_destroy_session(sn);
884         return;
885 }
886
887 void
888 sfw_test_rpc_done (srpc_client_rpc_t *rpc)
889 {
890         sfw_test_unit_t     *tsu = rpc->crpc_priv;
891         sfw_test_instance_t *tsi = tsu->tsu_instance;
892         int                  done = 0;
893
894         tsi->tsi_ops->tso_done_rpc(tsu, rpc);
895
896         spin_lock(&tsi->tsi_lock);
897
898         LASSERT (sfw_test_active(tsi));
899         LASSERT (!cfs_list_empty(&rpc->crpc_list));
900
901         cfs_list_del_init(&rpc->crpc_list);
902
903         /* batch is stopping or loop is done or get error */
904         if (tsi->tsi_stopping ||
905             tsu->tsu_loop == 0 ||
906             (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr))
907                 done = 1;
908
909         /* dec ref for poster */
910         srpc_client_rpc_decref(rpc);
911
912         spin_unlock(&tsi->tsi_lock);
913
914         if (!done) {
915                 swi_schedule_workitem(&tsu->tsu_worker);
916                 return;
917         }
918
919         sfw_test_unit_done(tsu);
920         return;
921 }
922
923 int
924 sfw_create_test_rpc(sfw_test_unit_t *tsu, lnet_process_id_t peer,
925                     unsigned features, int nblk, int blklen,
926                     srpc_client_rpc_t **rpcpp)
927 {
928         srpc_client_rpc_t   *rpc = NULL;
929         sfw_test_instance_t *tsi = tsu->tsu_instance;
930
931         spin_lock(&tsi->tsi_lock);
932
933         LASSERT (sfw_test_active(tsi));
934
935         if (!cfs_list_empty(&tsi->tsi_free_rpcs)) {
936                 /* pick request from buffer */
937                 rpc = cfs_list_entry(tsi->tsi_free_rpcs.next,
938                                      srpc_client_rpc_t, crpc_list);
939                 LASSERT (nblk == rpc->crpc_bulk.bk_niov);
940                 cfs_list_del_init(&rpc->crpc_list);
941         }
942
943         spin_unlock(&tsi->tsi_lock);
944
945         if (rpc == NULL) {
946                 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
947                                              blklen, sfw_test_rpc_done,
948                                              sfw_test_rpc_fini, tsu);
949         } else {
950                 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
951                                      blklen, sfw_test_rpc_done,
952                                      sfw_test_rpc_fini, tsu);
953         }
954
955         if (rpc == NULL) {
956                 CERROR("Can't create rpc for test %d\n", tsi->tsi_service);
957                 return -ENOMEM;
958         }
959
960         rpc->crpc_reqstmsg.msg_ses_feats = features;
961         *rpcpp = rpc;
962
963         return 0;
964 }
965
966 int
967 sfw_run_test (swi_workitem_t *wi)
968 {
969         sfw_test_unit_t     *tsu = wi->swi_workitem.wi_data;
970         sfw_test_instance_t *tsi = tsu->tsu_instance;
971         srpc_client_rpc_t   *rpc = NULL;
972
973         LASSERT (wi == &tsu->tsu_worker);
974
975         if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
976                 LASSERT (rpc == NULL);
977                 goto test_done;
978         }
979
980         LASSERT (rpc != NULL);
981
982         spin_lock(&tsi->tsi_lock);
983
984         if (tsi->tsi_stopping) {
985                 cfs_list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
986                 spin_unlock(&tsi->tsi_lock);
987                 goto test_done;
988         }
989
990         if (tsu->tsu_loop > 0)
991                 tsu->tsu_loop--;
992
993         cfs_list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
994         spin_unlock(&tsi->tsi_lock);
995
996         rpc->crpc_timeout = rpc_timeout;
997
998         spin_lock(&rpc->crpc_lock);
999         srpc_post_rpc(rpc);
1000         spin_unlock(&rpc->crpc_lock);
1001         return 0;
1002
1003 test_done:
1004         /*
1005          * No one can schedule me now since:
1006          * - previous RPC, if any, has done and
1007          * - no new RPC is initiated.
1008          * - my batch is still active; no one can run it again now.
1009          * Cancel pending schedules and prevent future schedule attempts:
1010          */
1011         swi_exit_workitem(wi);
1012         sfw_test_unit_done(tsu);
1013         return 1;
1014 }
1015
1016 int
1017 sfw_run_batch (sfw_batch_t *tsb)
1018 {
1019         swi_workitem_t      *wi;
1020         sfw_test_unit_t     *tsu;
1021         sfw_test_instance_t *tsi;
1022
1023         if (sfw_batch_active(tsb)) {
1024                 CDEBUG(D_NET, "Batch already active: "LPU64" (%d)\n",
1025                        tsb->bat_id.bat_id, cfs_atomic_read(&tsb->bat_nactive));
1026                 return 0;
1027         }
1028
1029         cfs_list_for_each_entry_typed (tsi, &tsb->bat_tests,
1030                                        sfw_test_instance_t, tsi_list) {
1031                 if (!tsi->tsi_is_client) /* skip server instances */
1032                         continue;
1033
1034                 LASSERT (!tsi->tsi_stopping);
1035                 LASSERT (!sfw_test_active(tsi));
1036
1037                 cfs_atomic_inc(&tsb->bat_nactive);
1038
1039                 cfs_list_for_each_entry_typed (tsu, &tsi->tsi_units,
1040                                                sfw_test_unit_t, tsu_list) {
1041                         cfs_atomic_inc(&tsi->tsi_nactive);
1042                         tsu->tsu_loop = tsi->tsi_loop;
1043                         wi = &tsu->tsu_worker;
1044                         swi_init_workitem(wi, tsu, sfw_run_test,
1045                                           lst_sched_test[\
1046                                           lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
1047                         swi_schedule_workitem(wi);
1048                 }
1049         }
1050
1051         return 0;
1052 }
1053
1054 int
1055 sfw_stop_batch (sfw_batch_t *tsb, int force)
1056 {
1057         sfw_test_instance_t *tsi;
1058         srpc_client_rpc_t   *rpc;
1059
1060         if (!sfw_batch_active(tsb)) {
1061                 CDEBUG(D_NET, "Batch "LPU64" inactive\n", tsb->bat_id.bat_id);
1062                 return 0;
1063         }
1064
1065         cfs_list_for_each_entry_typed (tsi, &tsb->bat_tests,
1066                                        sfw_test_instance_t, tsi_list) {
1067                 spin_lock(&tsi->tsi_lock);
1068
1069                 if (!tsi->tsi_is_client ||
1070                     !sfw_test_active(tsi) || tsi->tsi_stopping) {
1071                         spin_unlock(&tsi->tsi_lock);
1072                         continue;
1073                 }
1074
1075                 tsi->tsi_stopping = 1;
1076
1077                 if (!force) {
1078                         spin_unlock(&tsi->tsi_lock);
1079                         continue;
1080                 }
1081
1082                 /* abort launched rpcs in the test */
1083                 cfs_list_for_each_entry_typed(rpc, &tsi->tsi_active_rpcs,
1084                                               srpc_client_rpc_t, crpc_list) {
1085                         spin_lock(&rpc->crpc_lock);
1086
1087                         srpc_abort_rpc(rpc, -EINTR);
1088
1089                         spin_unlock(&rpc->crpc_lock);
1090                 }
1091
1092                 spin_unlock(&tsi->tsi_lock);
1093         }
1094
1095         return 0;
1096 }
1097
1098 int
1099 sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply)
1100 {
1101         sfw_test_instance_t *tsi;
1102
1103         if (testidx < 0)
1104                 return -EINVAL;
1105
1106         if (testidx == 0) {
1107                 reply->bar_active = cfs_atomic_read(&tsb->bat_nactive);
1108                 return 0;
1109         }
1110
1111         cfs_list_for_each_entry_typed (tsi, &tsb->bat_tests,
1112                                        sfw_test_instance_t, tsi_list) {
1113                 if (testidx-- > 1)
1114                         continue;
1115
1116                 reply->bar_active = cfs_atomic_read(&tsi->tsi_nactive);
1117                 return 0;
1118         }
1119
1120         return -ENOENT;
1121 }
1122
1123 void
1124 sfw_free_pages (srpc_server_rpc_t *rpc)
1125 {
1126         srpc_free_bulk(rpc->srpc_bulk);
1127         rpc->srpc_bulk = NULL;
1128 }
1129
1130 int
1131 sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len,
1132                 int sink)
1133 {
1134         LASSERT(rpc->srpc_bulk == NULL);
1135         LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
1136
1137         rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink);
1138         if (rpc->srpc_bulk == NULL)
1139                 return -ENOMEM;
1140
1141         return 0;
1142 }
1143
1144 int
1145 sfw_add_test (srpc_server_rpc_t *rpc)
1146 {
1147         sfw_session_t     *sn = sfw_data.fw_session;
1148         srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1149         srpc_test_reqst_t *request;
1150         int                rc;
1151         sfw_batch_t       *bat;
1152
1153         request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1154         reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1155
1156         if (request->tsr_loop == 0 ||
1157             request->tsr_concur == 0 ||
1158             request->tsr_sid.ses_nid == LNET_NID_ANY ||
1159             request->tsr_ndest > SFW_MAX_NDESTS ||
1160             (request->tsr_is_client && request->tsr_ndest == 0) ||
1161             request->tsr_concur > SFW_MAX_CONCUR ||
1162             request->tsr_service > SRPC_SERVICE_MAX_ID ||
1163             request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1164                 reply->tsr_status = EINVAL;
1165                 return 0;
1166         }
1167
1168         if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1169             sfw_find_test_case(request->tsr_service) == NULL) {
1170                 reply->tsr_status = ENOENT;
1171                 return 0;
1172         }
1173
1174         bat = sfw_bid2batch(request->tsr_bid);
1175         if (bat == NULL) {
1176                 CERROR ("Dropping RPC (%s) from %s under memory pressure.\n",
1177                         rpc->srpc_scd->scd_svc->sv_name,
1178                         libcfs_id2str(rpc->srpc_peer));
1179                 return -ENOMEM;
1180         }
1181
1182         if (sfw_batch_active(bat)) {
1183                 reply->tsr_status = EBUSY;
1184                 return 0;
1185         }
1186
1187         if (request->tsr_is_client && rpc->srpc_bulk == NULL) {
1188                 /* rpc will be resumed later in sfw_bulk_ready */
1189                 int     npg = sfw_id_pages(request->tsr_ndest);
1190                 int     len;
1191
1192                 if ((sn->sn_features & LST_FEAT_BULK_LEN) == 0) {
1193                         len = npg * CFS_PAGE_SIZE;
1194
1195                 } else  {
1196                         len = sizeof(lnet_process_id_packed_t) *
1197                               request->tsr_ndest;
1198                 }
1199
1200                 return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1);
1201         }
1202
1203         rc = sfw_add_test_instance(bat, rpc);
1204         CDEBUG (rc == 0 ? D_NET : D_WARNING,
1205                 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1206                 rc == 0 ? "Added" : "Failed to add", request->tsr_service,
1207                 request->tsr_is_client ? "client" : "server",
1208                 request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1209
1210         reply->tsr_status = (rc < 0) ? -rc : rc;
1211         return 0;
1212 }
1213
1214 int
1215 sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply)
1216 {
1217         sfw_session_t *sn = sfw_data.fw_session;
1218         int            rc = 0;
1219         sfw_batch_t   *bat;
1220
1221         reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id;
1222
1223         if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1224                 reply->bar_status = ESRCH;
1225                 return 0;
1226         }
1227
1228         bat = sfw_find_batch(request->bar_bid);
1229         if (bat == NULL) {
1230                 reply->bar_status = ENOENT;
1231                 return 0;
1232         }
1233
1234         switch (request->bar_opc) {
1235         case SRPC_BATCH_OPC_RUN:
1236                 rc = sfw_run_batch(bat);
1237                 break;
1238
1239         case SRPC_BATCH_OPC_STOP:
1240                 rc = sfw_stop_batch(bat, request->bar_arg);
1241                 break;
1242
1243         case SRPC_BATCH_OPC_QUERY:
1244                 rc = sfw_query_batch(bat, request->bar_testidx, reply);
1245                 break;
1246
1247         default:
1248                 return -EINVAL; /* drop it */
1249         }
1250
1251         reply->bar_status = (rc < 0) ? -rc : rc;
1252         return 0;
1253 }
1254
1255 int
1256 sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
1257 {
1258         struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1259         srpc_msg_t     *reply   = &rpc->srpc_replymsg;
1260         srpc_msg_t     *request = &rpc->srpc_reqstbuf->buf_msg;
1261         unsigned        features = LST_FEATS_MASK;
1262         int             rc = 0;
1263
1264         LASSERT(sfw_data.fw_active_srpc == NULL);
1265         LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1266
1267         spin_lock(&sfw_data.fw_lock);
1268
1269         if (sfw_data.fw_shuttingdown) {
1270                 spin_unlock(&sfw_data.fw_lock);
1271                 return -ESHUTDOWN;
1272         }
1273
1274         /* Remove timer to avoid racing with it or expiring active session */
1275         if (sfw_del_session_timer() != 0) {
1276                 CERROR("Dropping RPC (%s) from %s: racing with expiry timer.",
1277                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1278                 spin_unlock(&sfw_data.fw_lock);
1279                 return -EAGAIN;
1280         }
1281
1282         sfw_data.fw_active_srpc = rpc;
1283         spin_unlock(&sfw_data.fw_lock);
1284
1285         sfw_unpack_message(request);
1286         LASSERT(request->msg_type == srpc_service2request(sv->sv_id));
1287
1288         /* rpc module should have checked this */
1289         LASSERT(request->msg_version == SRPC_MSG_VERSION);
1290
1291         if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION &&
1292             sv->sv_id != SRPC_SERVICE_DEBUG) {
1293                 sfw_session_t *sn = sfw_data.fw_session;
1294
1295                 if (sn != NULL &&
1296                     sn->sn_features != request->msg_ses_feats) {
1297                         CNETERR("Features of framework RPC don't match "
1298                                 "features of current session: %x/%x\n",
1299                                 request->msg_ses_feats, sn->sn_features);
1300                         reply->msg_body.reply.status = EPROTO;
1301                         reply->msg_body.reply.sid    = sn->sn_id;
1302                         goto out;
1303                 }
1304
1305         } else if ((request->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
1306                 /* NB: at this point, old version will ignore features and
1307                  * create new session anyway, so console should be able
1308                  * to handle this */
1309                 reply->msg_body.reply.status = EPROTO;
1310                 goto out;
1311         }
1312
1313         switch(sv->sv_id) {
1314         default:
1315                 LBUG ();
1316         case SRPC_SERVICE_TEST:
1317                 rc = sfw_add_test(rpc);
1318                 break;
1319
1320         case SRPC_SERVICE_BATCH:
1321                 rc = sfw_control_batch(&request->msg_body.bat_reqst,
1322                                        &reply->msg_body.bat_reply);
1323                 break;
1324
1325         case SRPC_SERVICE_QUERY_STAT:
1326                 rc = sfw_get_stats(&request->msg_body.stat_reqst,
1327                                    &reply->msg_body.stat_reply);
1328                 break;
1329
1330         case SRPC_SERVICE_DEBUG:
1331                 rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1332                                        &reply->msg_body.dbg_reply);
1333                 break;
1334
1335         case SRPC_SERVICE_MAKE_SESSION:
1336                 rc = sfw_make_session(&request->msg_body.mksn_reqst,
1337                                       &reply->msg_body.mksn_reply);
1338                 break;
1339
1340         case SRPC_SERVICE_REMOVE_SESSION:
1341                 rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1342                                         &reply->msg_body.rmsn_reply);
1343                 break;
1344         }
1345
1346         if (sfw_data.fw_session != NULL)
1347                 features = sfw_data.fw_session->sn_features;
1348  out:
1349         reply->msg_ses_feats = features;
1350         rpc->srpc_done = sfw_server_rpc_done;
1351         spin_lock(&sfw_data.fw_lock);
1352
1353 #ifdef __KERNEL__
1354         if (!sfw_data.fw_shuttingdown)
1355                 sfw_add_session_timer();
1356 #else
1357         LASSERT(!sfw_data.fw_shuttingdown);
1358         sfw_add_session_timer();
1359 #endif
1360
1361         sfw_data.fw_active_srpc = NULL;
1362         spin_unlock(&sfw_data.fw_lock);
1363         return rc;
1364 }
1365
1366 int
1367 sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
1368 {
1369         struct srpc_service     *sv = rpc->srpc_scd->scd_svc;
1370         int                     rc;
1371
1372         LASSERT(rpc->srpc_bulk != NULL);
1373         LASSERT(sv->sv_id == SRPC_SERVICE_TEST);
1374         LASSERT(sfw_data.fw_active_srpc == NULL);
1375         LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1376
1377         spin_lock(&sfw_data.fw_lock);
1378
1379         if (status != 0) {
1380                 CERROR("Bulk transfer failed for RPC: "
1381                        "service %s, peer %s, status %d\n",
1382                        sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1383                 spin_unlock(&sfw_data.fw_lock);
1384                 return -EIO;
1385         }
1386
1387         if (sfw_data.fw_shuttingdown) {
1388                 spin_unlock(&sfw_data.fw_lock);
1389                 return -ESHUTDOWN;
1390         }
1391
1392         if (sfw_del_session_timer() != 0) {
1393                 CERROR("Dropping RPC (%s) from %s: racing with expiry timer",
1394                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1395                 spin_unlock(&sfw_data.fw_lock);
1396                 return -EAGAIN;
1397         }
1398
1399         sfw_data.fw_active_srpc = rpc;
1400         spin_unlock(&sfw_data.fw_lock);
1401
1402         rc = sfw_add_test(rpc);
1403
1404         spin_lock(&sfw_data.fw_lock);
1405
1406 #ifdef __KERNEL__
1407         if (!sfw_data.fw_shuttingdown)
1408                 sfw_add_session_timer();
1409 #else
1410         LASSERT(!sfw_data.fw_shuttingdown);
1411         sfw_add_session_timer();
1412 #endif
1413
1414         sfw_data.fw_active_srpc = NULL;
1415         spin_unlock(&sfw_data.fw_lock);
1416         return rc;
1417 }
1418
1419 srpc_client_rpc_t *
1420 sfw_create_rpc(lnet_process_id_t peer, int service,
1421                unsigned features, int nbulkiov, int bulklen,
1422                void (*done)(srpc_client_rpc_t *), void *priv)
1423 {
1424         srpc_client_rpc_t *rpc = NULL;
1425
1426         spin_lock(&sfw_data.fw_lock);
1427
1428         LASSERT (!sfw_data.fw_shuttingdown);
1429         LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1430
1431         if (nbulkiov == 0 && !cfs_list_empty(&sfw_data.fw_zombie_rpcs)) {
1432                 rpc = cfs_list_entry(sfw_data.fw_zombie_rpcs.next,
1433                                      srpc_client_rpc_t, crpc_list);
1434                 cfs_list_del(&rpc->crpc_list);
1435
1436                 srpc_init_client_rpc(rpc, peer, service, 0, 0,
1437                                      done, sfw_client_rpc_fini, priv);
1438         }
1439
1440         spin_unlock(&sfw_data.fw_lock);
1441
1442         if (rpc == NULL) {
1443                 rpc = srpc_create_client_rpc(peer, service,
1444                                              nbulkiov, bulklen, done,
1445                                              nbulkiov != 0 ?  NULL :
1446                                              sfw_client_rpc_fini,
1447                                              priv);
1448         }
1449
1450         if (rpc != NULL) /* "session" is concept in framework */
1451                 rpc->crpc_reqstmsg.msg_ses_feats = features;
1452
1453         return rpc;
1454 }
1455
1456 void
1457 sfw_unpack_message (srpc_msg_t *msg)
1458 {
1459         if (msg->msg_magic == SRPC_MSG_MAGIC)
1460                 return; /* no flipping needed */
1461
1462         /* srpc module should guarantee I wouldn't get crap */
1463         LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1464
1465         if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1466                 srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst;
1467
1468                 __swab32s(&req->str_type);
1469                 __swab64s(&req->str_rpyid);
1470                 sfw_unpack_sid(req->str_sid);
1471                 return;
1472         }
1473
1474         if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1475                 srpc_stat_reply_t *rep = &msg->msg_body.stat_reply;
1476
1477                 __swab32s(&rep->str_status);
1478                 sfw_unpack_sid(rep->str_sid);
1479                 sfw_unpack_fw_counters(rep->str_fw);
1480                 sfw_unpack_rpc_counters(rep->str_rpc);
1481                 sfw_unpack_lnet_counters(rep->str_lnet);
1482                 return;
1483         }
1484
1485         if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1486                 srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst;
1487
1488                 __swab64s(&req->mksn_rpyid);
1489                 __swab32s(&req->mksn_force);
1490                 sfw_unpack_sid(req->mksn_sid);
1491                 return;
1492         }
1493
1494         if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1495                 srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply;
1496
1497                 __swab32s(&rep->mksn_status);
1498                 __swab32s(&rep->mksn_timeout);
1499                 sfw_unpack_sid(rep->mksn_sid);
1500                 return;
1501         }
1502
1503         if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1504                 srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst;
1505
1506                 __swab64s(&req->rmsn_rpyid);
1507                 sfw_unpack_sid(req->rmsn_sid);
1508                 return;
1509         }
1510
1511         if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1512                 srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply;
1513
1514                 __swab32s(&rep->rmsn_status);
1515                 sfw_unpack_sid(rep->rmsn_sid);
1516                 return;
1517         }
1518
1519         if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1520                 srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst;
1521
1522                 __swab64s(&req->dbg_rpyid);
1523                 __swab32s(&req->dbg_flags);
1524                 sfw_unpack_sid(req->dbg_sid);
1525                 return;
1526         }
1527
1528         if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1529                 srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply;
1530
1531                 __swab32s(&rep->dbg_nbatch);
1532                 __swab32s(&rep->dbg_timeout);
1533                 sfw_unpack_sid(rep->dbg_sid);
1534                 return;
1535         }
1536
1537         if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1538                 srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst;
1539
1540                 __swab32s(&req->bar_opc);
1541                 __swab64s(&req->bar_rpyid);
1542                 __swab32s(&req->bar_testidx);
1543                 __swab32s(&req->bar_arg);
1544                 sfw_unpack_sid(req->bar_sid);
1545                 __swab64s(&req->bar_bid.bat_id);
1546                 return;
1547         }
1548
1549         if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1550                 srpc_batch_reply_t *rep = &msg->msg_body.bat_reply;
1551
1552                 __swab32s(&rep->bar_status);
1553                 sfw_unpack_sid(rep->bar_sid);
1554                 return;
1555         }
1556
1557         if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1558                 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst;
1559
1560                 __swab64s(&req->tsr_rpyid);
1561                 __swab64s(&req->tsr_bulkid);
1562                 __swab32s(&req->tsr_loop);
1563                 __swab32s(&req->tsr_ndest);
1564                 __swab32s(&req->tsr_concur);
1565                 __swab32s(&req->tsr_service);
1566                 sfw_unpack_sid(req->tsr_sid);
1567                 __swab64s(&req->tsr_bid.bat_id);
1568                 return;
1569         }
1570
1571         if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1572                 srpc_test_reply_t *rep = &msg->msg_body.tes_reply;
1573
1574                 __swab32s(&rep->tsr_status);
1575                 sfw_unpack_sid(rep->tsr_sid);
1576                 return;
1577         }
1578
1579         if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1580                 srpc_join_reqst_t *req = &msg->msg_body.join_reqst;
1581
1582                 __swab64s(&req->join_rpyid);
1583                 sfw_unpack_sid(req->join_sid);
1584                 return;
1585         }
1586
1587         if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1588                 srpc_join_reply_t *rep = &msg->msg_body.join_reply;
1589
1590                 __swab32s(&rep->join_status);
1591                 __swab32s(&rep->join_timeout);
1592                 sfw_unpack_sid(rep->join_sid);
1593                 return;
1594         }
1595
1596         LBUG ();
1597         return;
1598 }
1599
1600 void
1601 sfw_abort_rpc (srpc_client_rpc_t *rpc)
1602 {
1603         LASSERT(cfs_atomic_read(&rpc->crpc_refcount) > 0);
1604         LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1605
1606         spin_lock(&rpc->crpc_lock);
1607         srpc_abort_rpc(rpc, -EINTR);
1608         spin_unlock(&rpc->crpc_lock);
1609         return;
1610 }
1611
1612 void
1613 sfw_post_rpc (srpc_client_rpc_t *rpc)
1614 {
1615         spin_lock(&rpc->crpc_lock);
1616
1617         LASSERT (!rpc->crpc_closed);
1618         LASSERT (!rpc->crpc_aborted);
1619         LASSERT (cfs_list_empty(&rpc->crpc_list));
1620         LASSERT (!sfw_data.fw_shuttingdown);
1621
1622         rpc->crpc_timeout = rpc_timeout;
1623         srpc_post_rpc(rpc);
1624
1625         spin_unlock(&rpc->crpc_lock);
1626         return;
1627 }
1628
1629 static srpc_service_t sfw_services[] = 
1630 {
1631         {
1632                 /* sv_id */    SRPC_SERVICE_DEBUG,
1633                 /* sv_name */  "debug",
1634                 0
1635         },
1636         {
1637                 /* sv_id */    SRPC_SERVICE_QUERY_STAT,
1638                 /* sv_name */  "query stats",
1639                 0
1640         },
1641         {
1642                 /* sv_id */    SRPC_SERVICE_MAKE_SESSION,
1643                 /* sv_name */  "make session",
1644                 0
1645         },
1646         {
1647                 /* sv_id */    SRPC_SERVICE_REMOVE_SESSION,
1648                 /* sv_name */  "remove session",
1649                 0
1650         },
1651         {
1652                 /* sv_id */    SRPC_SERVICE_BATCH,
1653                 /* sv_name */  "batch service",
1654                 0
1655         },
1656         {
1657                 /* sv_id */    SRPC_SERVICE_TEST,
1658                 /* sv_name */  "test service",
1659                 0
1660         },
1661         {
1662                 /* sv_id */    0,
1663                 /* sv_name */  NULL,
1664                 0
1665         }
1666 };
1667
1668 extern sfw_test_client_ops_t ping_test_client;
1669 extern srpc_service_t        ping_test_service;
1670 extern void ping_init_test_client(void);
1671 extern void ping_init_test_service(void);
1672
1673 extern sfw_test_client_ops_t brw_test_client;
1674 extern srpc_service_t        brw_test_service;
1675 extern void brw_init_test_client(void);
1676 extern void brw_init_test_service(void);
1677
1678
1679 int
1680 sfw_startup (void)
1681 {
1682         int              i;
1683         int              rc;
1684         int              error;
1685         srpc_service_t  *sv;
1686         sfw_test_case_t *tsc;
1687
1688 #ifndef __KERNEL__
1689         char *s;
1690
1691         s = getenv("SESSION_TIMEOUT");
1692         session_timeout = s != NULL ? atoi(s) : session_timeout;
1693
1694         s = getenv("RPC_TIMEOUT");
1695         rpc_timeout = s != NULL ? atoi(s) : rpc_timeout;
1696 #endif
1697
1698         if (session_timeout < 0) {
1699                 CERROR ("Session timeout must be non-negative: %d\n",
1700                         session_timeout);
1701                 return -EINVAL;
1702         }
1703
1704         if (rpc_timeout < 0) {
1705                 CERROR ("RPC timeout must be non-negative: %d\n",
1706                         rpc_timeout);
1707                 return -EINVAL;
1708         }
1709
1710         if (session_timeout == 0)
1711                 CWARN ("Zero session_timeout specified "
1712                        "- test sessions never expire.\n");
1713
1714         if (rpc_timeout == 0)
1715                 CWARN ("Zero rpc_timeout specified "
1716                        "- test RPC never expire.\n");
1717
1718         memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1719
1720         sfw_data.fw_session     = NULL;
1721         sfw_data.fw_active_srpc = NULL;
1722         spin_lock_init(&sfw_data.fw_lock);
1723         cfs_atomic_set(&sfw_data.fw_nzombies, 0);
1724         CFS_INIT_LIST_HEAD(&sfw_data.fw_tests);
1725         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1726         CFS_INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1727
1728         brw_init_test_client();
1729         brw_init_test_service();
1730         rc = sfw_register_test(&brw_test_service, &brw_test_client);
1731         LASSERT (rc == 0);
1732
1733         ping_init_test_client();
1734         ping_init_test_service();
1735         rc = sfw_register_test(&ping_test_service, &ping_test_client);
1736         LASSERT (rc == 0);
1737
1738         error = 0;
1739         cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests,
1740                                        sfw_test_case_t, tsc_list) {
1741                 sv = tsc->tsc_srv_service;
1742
1743                 rc = srpc_add_service(sv);
1744                 LASSERT (rc != -EBUSY);
1745                 if (rc != 0) {
1746                         CWARN ("Failed to add %s service: %d\n",
1747                                sv->sv_name, rc);
1748                         error = rc;
1749                 }
1750         }
1751
1752         for (i = 0; ; i++) {
1753                 sv = &sfw_services[i];
1754                 if (sv->sv_name == NULL) break;
1755
1756                 sv->sv_bulk_ready = NULL;
1757                 sv->sv_handler    = sfw_handle_server_rpc;
1758                 sv->sv_wi_total   = SFW_FRWK_WI_MAX;
1759                 if (sv->sv_id == SRPC_SERVICE_TEST)
1760                         sv->sv_bulk_ready = sfw_bulk_ready;
1761
1762                 rc = srpc_add_service(sv);
1763                 LASSERT (rc != -EBUSY);
1764                 if (rc != 0) {
1765                         CWARN ("Failed to add %s service: %d\n",
1766                                sv->sv_name, rc);
1767                         error = rc;
1768                 }
1769
1770                 /* about to sfw_shutdown, no need to add buffer */
1771                 if (error) continue;
1772
1773                 rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
1774                 if (rc != 0) {
1775                         CWARN("Failed to reserve enough buffers: "
1776                               "service %s, %d needed: %d\n",
1777                               sv->sv_name, sv->sv_wi_total, rc);
1778                         error = -ENOMEM;
1779                 }
1780         }
1781
1782         if (error != 0)
1783                 sfw_shutdown();
1784         return error;
1785 }
1786
1787 void
1788 sfw_shutdown (void)
1789 {
1790         srpc_service_t  *sv;
1791         sfw_test_case_t *tsc;
1792         int              i;
1793
1794         spin_lock(&sfw_data.fw_lock);
1795
1796         sfw_data.fw_shuttingdown = 1;
1797 #ifdef __KERNEL__
1798         lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock,
1799                        "waiting for active RPC to finish.\n");
1800 #else
1801         LASSERT (sfw_data.fw_active_srpc == NULL);
1802 #endif
1803
1804         if (sfw_del_session_timer() != 0)
1805                 lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock,
1806                                "waiting for session timer to explode.\n");
1807
1808         sfw_deactivate_session();
1809         lst_wait_until(cfs_atomic_read(&sfw_data.fw_nzombies) == 0,
1810                        sfw_data.fw_lock,
1811                        "waiting for %d zombie sessions to die.\n",
1812                        cfs_atomic_read(&sfw_data.fw_nzombies));
1813
1814         spin_unlock(&sfw_data.fw_lock);
1815
1816         for (i = 0; ; i++) {
1817                 sv = &sfw_services[i];
1818                 if (sv->sv_name == NULL)
1819                         break;
1820
1821                 srpc_shutdown_service(sv);
1822                 srpc_remove_service(sv);
1823         }
1824
1825         cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests,
1826                                        sfw_test_case_t, tsc_list) {
1827                 sv = tsc->tsc_srv_service;
1828                 srpc_shutdown_service(sv);
1829                 srpc_remove_service(sv);
1830         }
1831
1832         while (!cfs_list_empty(&sfw_data.fw_zombie_rpcs)) {
1833                 srpc_client_rpc_t *rpc;
1834
1835                 rpc = cfs_list_entry(sfw_data.fw_zombie_rpcs.next, 
1836                                      srpc_client_rpc_t, crpc_list);
1837                 cfs_list_del(&rpc->crpc_list);
1838
1839                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1840         }
1841
1842         for (i = 0; ; i++) {
1843                 sv = &sfw_services[i];
1844                 if (sv->sv_name == NULL)
1845                         break;
1846
1847                 srpc_wait_service_shutdown(sv);
1848         }
1849
1850         while (!cfs_list_empty(&sfw_data.fw_tests)) {
1851                 tsc = cfs_list_entry(sfw_data.fw_tests.next,
1852                                      sfw_test_case_t, tsc_list);
1853
1854                 srpc_wait_service_shutdown(tsc->tsc_srv_service);
1855
1856                 cfs_list_del(&tsc->tsc_list);
1857                 LIBCFS_FREE(tsc, sizeof(*tsc));
1858         }
1859
1860         return;
1861 }