Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / selftest / workitem.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/workitem.c
37  *
38  * Author: Isaac Huang <isaac@clusterfs.com>
39  */
40 #define DEBUG_SUBSYSTEM S_LNET
41
42 #include "selftest.h"
43
44
45 struct smoketest_workitem {
46         struct list_head wi_runq;         /* concurrent workitems */
47         struct list_head wi_serial_runq;  /* serialised workitems */
48         cfs_waitq_t      wi_waitq;        /* where schedulers sleep */
49         cfs_waitq_t      wi_serial_waitq; /* where serial scheduler sleep */
50         spinlock_t       wi_lock;         /* serialize */
51         int              wi_shuttingdown;
52         int              wi_nthreads;
53 } swi_data;
54
55 static inline int
56 swi_sched_cansleep (struct list_head *q)
57 {
58         int rc;
59
60         spin_lock(&swi_data.wi_lock);
61
62         rc = !swi_data.wi_shuttingdown && list_empty(q);
63
64         spin_unlock(&swi_data.wi_lock);
65         return rc;
66 }
67
68 /* XXX: 
69  * 0. it only works when called from wi->wi_action.
70  * 1. when it returns no one shall try to schedule the workitem.
71  */
72 void
73 swi_kill_workitem (swi_workitem_t *wi)
74 {
75         LASSERT (!in_interrupt()); /* because we use plain spinlock */
76         LASSERT (!swi_data.wi_shuttingdown);
77
78         spin_lock(&swi_data.wi_lock);
79
80 #ifdef __KERNEL__
81         LASSERT (wi->wi_running);
82 #endif
83
84         if (wi->wi_scheduled) { /* cancel pending schedules */
85                 LASSERT (!list_empty(&wi->wi_list));
86                 list_del_init(&wi->wi_list);
87         }
88
89         LASSERT (list_empty(&wi->wi_list));
90         wi->wi_scheduled = 1; /* LBUG future schedule attempts */
91
92         spin_unlock(&swi_data.wi_lock);
93         return;
94 }
95
96 void
97 swi_schedule_workitem (swi_workitem_t *wi)
98 {
99         LASSERT (!in_interrupt()); /* because we use plain spinlock */
100         LASSERT (!swi_data.wi_shuttingdown);
101
102         spin_lock(&swi_data.wi_lock);
103
104         if (!wi->wi_scheduled) {
105                 LASSERT (list_empty(&wi->wi_list));
106
107                 wi->wi_scheduled = 1;
108                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
109                 cfs_waitq_signal(&swi_data.wi_waitq);
110         }
111
112         LASSERT (!list_empty(&wi->wi_list));
113         spin_unlock(&swi_data.wi_lock);
114         return;
115 }
116
117 /*
118  * Workitem scheduled by this function is strictly serialised not only with
119  * itself, but also with others scheduled this way.
120  *
121  * Now there's only one static serialised queue, but in the future more might
122  * be added, and even dynamic creation of serialised queues might be supported.
123  */
124 void
125 swi_schedule_serial_workitem (swi_workitem_t *wi)
126 {
127         LASSERT (!in_interrupt()); /* because we use plain spinlock */
128         LASSERT (!swi_data.wi_shuttingdown);
129
130         spin_lock(&swi_data.wi_lock);
131
132         if (!wi->wi_scheduled) {
133                 LASSERT (list_empty(&wi->wi_list));
134
135                 wi->wi_scheduled = 1;
136                 list_add_tail(&wi->wi_list, &swi_data.wi_serial_runq);
137                 cfs_waitq_signal(&swi_data.wi_serial_waitq);
138         }
139
140         LASSERT (!list_empty(&wi->wi_list));
141         spin_unlock(&swi_data.wi_lock);
142         return;
143 }
144
145 #ifdef __KERNEL__
146
147 int
148 swi_scheduler_main (void *arg)
149 {
150         int  id = (long) arg;
151         char name[16];
152
153         snprintf(name, sizeof(name), "swi_sd%03d", id);
154         cfs_daemonize(name);
155         cfs_block_allsigs();
156
157         spin_lock(&swi_data.wi_lock);
158
159         while (!swi_data.wi_shuttingdown) {
160                 int             nloops = 0;
161                 int             rc;
162                 swi_workitem_t *wi;
163
164                 while (!list_empty(&swi_data.wi_runq) && 
165                        nloops < SWI_RESCHED) {
166                         wi = list_entry(swi_data.wi_runq.next,
167                                         swi_workitem_t, wi_list);
168                         list_del_init(&wi->wi_list);
169
170                         LASSERT (wi->wi_scheduled);
171
172                         nloops++;
173                         if (wi->wi_running) {
174                                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
175                                 continue;
176                         }
177
178                         wi->wi_running   = 1;
179                         wi->wi_scheduled = 0;
180                         spin_unlock(&swi_data.wi_lock);
181
182                         rc = (*wi->wi_action) (wi);
183
184                         spin_lock(&swi_data.wi_lock);
185                         if (rc == 0) /* wi still active */
186                                 wi->wi_running = 0;
187                 }
188
189                 spin_unlock(&swi_data.wi_lock);
190
191                 if (nloops < SWI_RESCHED)
192                         wait_event_interruptible_exclusive(
193                                    swi_data.wi_waitq,
194                                    !swi_sched_cansleep(&swi_data.wi_runq));
195                 else
196                         our_cond_resched();
197
198                 spin_lock(&swi_data.wi_lock);
199         }
200
201         swi_data.wi_nthreads--;
202         spin_unlock(&swi_data.wi_lock);
203         return 0;
204 }
205
206 int
207 swi_serial_scheduler_main (void *arg)
208 {
209         UNUSED (arg);
210
211         cfs_daemonize("swi_serial_sd");
212         cfs_block_allsigs();
213
214         spin_lock(&swi_data.wi_lock);
215
216         while (!swi_data.wi_shuttingdown) {
217                 int             nloops = 0;
218                 int             rc;
219                 swi_workitem_t *wi;
220
221                 while (!list_empty(&swi_data.wi_serial_runq) && 
222                        nloops < SWI_RESCHED) {
223                         wi = list_entry(swi_data.wi_serial_runq.next,
224                                         swi_workitem_t, wi_list);
225                         list_del_init(&wi->wi_list);
226
227                         LASSERT (!wi->wi_running);
228                         LASSERT (wi->wi_scheduled);
229
230                         nloops++;
231                         wi->wi_running   = 1;
232                         wi->wi_scheduled = 0;
233                         spin_unlock(&swi_data.wi_lock);
234
235                         rc = (*wi->wi_action) (wi);
236
237                         spin_lock(&swi_data.wi_lock);
238                         if (rc == 0) /* wi still active */
239                                 wi->wi_running = 0;
240                 }
241
242                 spin_unlock(&swi_data.wi_lock);
243
244                 if (nloops < SWI_RESCHED)
245                         wait_event_interruptible_exclusive(
246                              swi_data.wi_serial_waitq, 
247                              !swi_sched_cansleep(&swi_data.wi_serial_runq));
248                 else
249                         our_cond_resched();
250
251                 spin_lock(&swi_data.wi_lock);
252         }
253
254         swi_data.wi_nthreads--;
255         spin_unlock(&swi_data.wi_lock);
256         return 0;
257 }
258
259 int
260 swi_start_thread (int (*func) (void*), void *arg)
261 {
262         long pid;
263
264         LASSERT (!swi_data.wi_shuttingdown);
265
266         pid = cfs_kernel_thread(func, arg, 0);
267         if (pid < 0)
268                 return (int)pid;
269
270         spin_lock(&swi_data.wi_lock);
271         swi_data.wi_nthreads++;
272         spin_unlock(&swi_data.wi_lock);
273         return 0;
274 }
275
276 #else /* __KERNEL__ */
277
278 int
279 swi_check_events (void)
280 {
281         int               n = 0;
282         swi_workitem_t   *wi;
283         struct list_head *q;
284
285         spin_lock(&swi_data.wi_lock);
286
287         for (;;) {
288                 if (!list_empty(&swi_data.wi_serial_runq))
289                         q = &swi_data.wi_serial_runq;
290                 else if (!list_empty(&swi_data.wi_runq))
291                         q = &swi_data.wi_runq;
292                 else
293                         break;
294
295                 wi = list_entry(q->next, swi_workitem_t, wi_list);
296                 list_del_init(&wi->wi_list);
297
298                 LASSERT (wi->wi_scheduled);
299                 wi->wi_scheduled = 0;
300                 spin_unlock(&swi_data.wi_lock);
301
302                 n++;
303                 (*wi->wi_action) (wi);
304
305                 spin_lock(&swi_data.wi_lock);
306         }
307
308         spin_unlock(&swi_data.wi_lock);
309         return n;
310 }
311
312 #endif
313
314 int
315 swi_startup (void)
316 {
317         int i;
318         int rc;
319
320         swi_data.wi_nthreads = 0;
321         swi_data.wi_shuttingdown = 0;
322         spin_lock_init(&swi_data.wi_lock);
323         cfs_waitq_init(&swi_data.wi_waitq);
324         cfs_waitq_init(&swi_data.wi_serial_waitq);
325         CFS_INIT_LIST_HEAD(&swi_data.wi_runq);
326         CFS_INIT_LIST_HEAD(&swi_data.wi_serial_runq);
327
328 #ifdef __KERNEL__
329         rc = swi_start_thread(swi_serial_scheduler_main, NULL);
330         if (rc != 0) {
331                 LASSERT (swi_data.wi_nthreads == 0);
332                 CERROR ("Can't spawn serial workitem scheduler: %d\n", rc);
333                 return rc;
334         }
335
336         for (i = 0; i < num_online_cpus(); i++) {
337                 rc = swi_start_thread(swi_scheduler_main, (void *) (long) i);
338                 if (rc != 0) {
339                         CERROR ("Can't spawn workitem scheduler: %d\n", rc);
340                         swi_shutdown();
341                         return rc;
342                 }
343         }
344 #else
345         UNUSED(i);
346         UNUSED(rc);
347 #endif
348
349         return 0;
350 }
351
352 void
353 swi_shutdown (void)
354 {
355         spin_lock(&swi_data.wi_lock);
356
357         LASSERT (list_empty(&swi_data.wi_runq));
358         LASSERT (list_empty(&swi_data.wi_serial_runq));
359
360         swi_data.wi_shuttingdown = 1;
361
362 #ifdef __KERNEL__
363         cfs_waitq_broadcast(&swi_data.wi_waitq);
364         cfs_waitq_broadcast(&swi_data.wi_serial_waitq);
365         lst_wait_until(swi_data.wi_nthreads == 0, swi_data.wi_lock,
366                        "waiting for %d threads to terminate\n",
367                        swi_data.wi_nthreads);
368 #endif
369
370         spin_unlock(&swi_data.wi_lock);
371         return;
372 }