Whamcloud - gitweb
b=22598 osd_trans_stop() page fault fix
[fs/lustre-release.git] / lnet / selftest / workitem.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/workitem.c
37  *
38  * Author: Isaac Huang <isaac@clusterfs.com>
39  */
40 #define DEBUG_SUBSYSTEM S_LNET
41
42 #include "selftest.h"
43
44
45 struct smoketest_workitem {
46         cfs_list_t       wi_runq;         /* concurrent workitems */
47         cfs_list_t       wi_serial_runq;  /* serialised workitems */
48         cfs_waitq_t      wi_waitq;        /* where schedulers sleep */
49         cfs_waitq_t      wi_serial_waitq; /* where serial scheduler sleep */
50         cfs_spinlock_t   wi_lock;         /* serialize */
51         int              wi_shuttingdown;
52         int              wi_nthreads;
53 } swi_data;
54
55 static inline int
56 swi_sched_cansleep (cfs_list_t *q)
57 {
58         int rc;
59
60         cfs_spin_lock(&swi_data.wi_lock);
61
62         rc = !swi_data.wi_shuttingdown && cfs_list_empty(q);
63
64         cfs_spin_unlock(&swi_data.wi_lock);
65         return rc;
66 }
67
68 /* XXX: 
69  * 0. it only works when called from wi->wi_action.
70  * 1. when it returns no one shall try to schedule the workitem.
71  */
72 void
73 swi_kill_workitem (swi_workitem_t *wi)
74 {
75         LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
76         LASSERT (!swi_data.wi_shuttingdown);
77
78         cfs_spin_lock(&swi_data.wi_lock);
79
80 #ifdef __KERNEL__
81         LASSERT (wi->wi_running);
82 #endif
83
84         if (wi->wi_scheduled) { /* cancel pending schedules */
85                 LASSERT (!cfs_list_empty(&wi->wi_list));
86                 cfs_list_del_init(&wi->wi_list);
87         }
88
89         LASSERT (cfs_list_empty(&wi->wi_list));
90         wi->wi_scheduled = 1; /* LBUG future schedule attempts */
91
92         cfs_spin_unlock(&swi_data.wi_lock);
93         return;
94 }
95
96 void
97 swi_schedule_workitem (swi_workitem_t *wi)
98 {
99         LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
100         LASSERT (!swi_data.wi_shuttingdown);
101
102         cfs_spin_lock(&swi_data.wi_lock);
103
104         if (!wi->wi_scheduled) {
105                 LASSERT (cfs_list_empty(&wi->wi_list));
106
107                 wi->wi_scheduled = 1;
108                 cfs_list_add_tail(&wi->wi_list, &swi_data.wi_runq);
109                 cfs_waitq_signal(&swi_data.wi_waitq);
110         }
111
112         LASSERT (!cfs_list_empty(&wi->wi_list));
113         cfs_spin_unlock(&swi_data.wi_lock);
114         return;
115 }
116
117 /*
118  * Workitem scheduled by this function is strictly serialised not only with
119  * itself, but also with others scheduled this way.
120  *
121  * Now there's only one static serialised queue, but in the future more might
122  * be added, and even dynamic creation of serialised queues might be supported.
123  */
124 void
125 swi_schedule_serial_workitem (swi_workitem_t *wi)
126 {
127         LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
128         LASSERT (!swi_data.wi_shuttingdown);
129
130         cfs_spin_lock(&swi_data.wi_lock);
131
132         if (!wi->wi_scheduled) {
133                 LASSERT (cfs_list_empty(&wi->wi_list));
134
135                 wi->wi_scheduled = 1;
136                 cfs_list_add_tail(&wi->wi_list, &swi_data.wi_serial_runq);
137                 cfs_waitq_signal(&swi_data.wi_serial_waitq);
138         }
139
140         LASSERT (!cfs_list_empty(&wi->wi_list));
141         cfs_spin_unlock(&swi_data.wi_lock);
142         return;
143 }
144
145 #ifdef __KERNEL__
146
147 int
148 swi_scheduler_main (void *arg)
149 {
150         int  id = (int)(long_ptr_t) arg;
151         char name[16];
152
153         snprintf(name, sizeof(name), "swi_sd%03d", id);
154         cfs_daemonize(name);
155         cfs_block_allsigs();
156
157         cfs_spin_lock(&swi_data.wi_lock);
158
159         while (!swi_data.wi_shuttingdown) {
160                 int             nloops = 0;
161                 int             rc;
162                 swi_workitem_t *wi;
163
164                 while (!cfs_list_empty(&swi_data.wi_runq) &&
165                        nloops < SWI_RESCHED) {
166                         wi = cfs_list_entry(swi_data.wi_runq.next,
167                                             swi_workitem_t, wi_list);
168                         cfs_list_del_init(&wi->wi_list);
169
170                         LASSERT (wi->wi_scheduled);
171
172                         nloops++;
173                         if (wi->wi_running) {
174                                 cfs_list_add_tail(&wi->wi_list,
175                                                   &swi_data.wi_runq);
176                                 continue;
177                         }
178
179                         wi->wi_running   = 1;
180                         wi->wi_scheduled = 0;
181                         cfs_spin_unlock(&swi_data.wi_lock);
182
183                         rc = (*wi->wi_action) (wi);
184
185                         cfs_spin_lock(&swi_data.wi_lock);
186                         if (rc == 0) /* wi still active */
187                                 wi->wi_running = 0;
188                 }
189
190                 cfs_spin_unlock(&swi_data.wi_lock);
191
192                 if (nloops < SWI_RESCHED)
193                         cfs_wait_event_interruptible_exclusive(
194                                 swi_data.wi_waitq,
195                                 !swi_sched_cansleep(&swi_data.wi_runq), rc);
196                 else
197                         cfs_cond_resched();
198
199                 cfs_spin_lock(&swi_data.wi_lock);
200         }
201
202         swi_data.wi_nthreads--;
203         cfs_spin_unlock(&swi_data.wi_lock);
204         return 0;
205 }
206
207 int
208 swi_serial_scheduler_main (void *arg)
209 {
210         UNUSED (arg);
211
212         cfs_daemonize("swi_serial_sd");
213         cfs_block_allsigs();
214
215         cfs_spin_lock(&swi_data.wi_lock);
216
217         while (!swi_data.wi_shuttingdown) {
218                 int             nloops = 0;
219                 int             rc;
220                 swi_workitem_t *wi;
221
222                 while (!cfs_list_empty(&swi_data.wi_serial_runq) &&
223                        nloops < SWI_RESCHED) {
224                         wi = cfs_list_entry(swi_data.wi_serial_runq.next,
225                                             swi_workitem_t, wi_list);
226                         cfs_list_del_init(&wi->wi_list);
227
228                         LASSERTF (!wi->wi_running && wi->wi_scheduled,
229                                   "wi %p running %d scheduled %d\n",
230                                   wi, wi->wi_running, wi->wi_scheduled);
231
232                         nloops++;
233                         wi->wi_running   = 1;
234                         wi->wi_scheduled = 0;
235                         cfs_spin_unlock(&swi_data.wi_lock);
236
237                         rc = (*wi->wi_action) (wi);
238
239                         cfs_spin_lock(&swi_data.wi_lock);
240                         if (rc == 0) /* wi still active */
241                                 wi->wi_running = 0;
242                 }
243
244                 cfs_spin_unlock(&swi_data.wi_lock);
245
246                 if (nloops < SWI_RESCHED)
247                         cfs_wait_event_interruptible_exclusive(
248                                 swi_data.wi_serial_waitq,
249                                 !swi_sched_cansleep(&swi_data.wi_serial_runq),
250                                 rc);
251                 else
252                         cfs_cond_resched();
253
254                 cfs_spin_lock(&swi_data.wi_lock);
255         }
256
257         swi_data.wi_nthreads--;
258         cfs_spin_unlock(&swi_data.wi_lock);
259         return 0;
260 }
261
262 int
263 swi_start_thread (int (*func) (void*), void *arg)
264 {
265         long pid;
266
267         LASSERT (!swi_data.wi_shuttingdown);
268
269         pid = cfs_kernel_thread(func, arg, 0);
270         if (pid < 0)
271                 return (int)pid;
272
273         cfs_spin_lock(&swi_data.wi_lock);
274         swi_data.wi_nthreads++;
275         cfs_spin_unlock(&swi_data.wi_lock);
276         return 0;
277 }
278
279 #else /* __KERNEL__ */
280
281 int
282 swi_check_events (void)
283 {
284         int               n = 0;
285         swi_workitem_t   *wi;
286         cfs_list_t       *q;
287
288         cfs_spin_lock(&swi_data.wi_lock);
289
290         for (;;) {
291                 if (!cfs_list_empty(&swi_data.wi_serial_runq))
292                         q = &swi_data.wi_serial_runq;
293                 else if (!cfs_list_empty(&swi_data.wi_runq))
294                         q = &swi_data.wi_runq;
295                 else
296                         break;
297
298                 wi = cfs_list_entry(q->next, swi_workitem_t, wi_list);
299                 cfs_list_del_init(&wi->wi_list);
300
301                 LASSERT (wi->wi_scheduled);
302                 wi->wi_scheduled = 0;
303                 cfs_spin_unlock(&swi_data.wi_lock);
304
305                 n++;
306                 (*wi->wi_action) (wi);
307
308                 cfs_spin_lock(&swi_data.wi_lock);
309         }
310
311         cfs_spin_unlock(&swi_data.wi_lock);
312         return n;
313 }
314
315 #endif
316
317 int
318 swi_startup (void)
319 {
320         int i;
321         int rc;
322
323         swi_data.wi_nthreads = 0;
324         swi_data.wi_shuttingdown = 0;
325         cfs_spin_lock_init(&swi_data.wi_lock);
326         cfs_waitq_init(&swi_data.wi_waitq);
327         cfs_waitq_init(&swi_data.wi_serial_waitq);
328         CFS_INIT_LIST_HEAD(&swi_data.wi_runq);
329         CFS_INIT_LIST_HEAD(&swi_data.wi_serial_runq);
330
331 #ifdef __KERNEL__
332         rc = swi_start_thread(swi_serial_scheduler_main, NULL);
333         if (rc != 0) {
334                 LASSERT (swi_data.wi_nthreads == 0);
335                 CERROR ("Can't spawn serial workitem scheduler: %d\n", rc);
336                 return rc;
337         }
338
339         for (i = 0; i < cfs_num_online_cpus(); i++) {
340                 rc = swi_start_thread(swi_scheduler_main,
341                                       (void *) (long_ptr_t) i);
342                 if (rc != 0) {
343                         CERROR ("Can't spawn workitem scheduler: %d\n", rc);
344                         swi_shutdown();
345                         return rc;
346                 }
347         }
348 #else
349         UNUSED(i);
350         UNUSED(rc);
351 #endif
352
353         return 0;
354 }
355
356 void
357 swi_shutdown (void)
358 {
359         cfs_spin_lock(&swi_data.wi_lock);
360
361         LASSERT (cfs_list_empty(&swi_data.wi_runq));
362         LASSERT (cfs_list_empty(&swi_data.wi_serial_runq));
363
364         swi_data.wi_shuttingdown = 1;
365
366 #ifdef __KERNEL__
367         cfs_waitq_broadcast(&swi_data.wi_waitq);
368         cfs_waitq_broadcast(&swi_data.wi_serial_waitq);
369         lst_wait_until(swi_data.wi_nthreads == 0, swi_data.wi_lock,
370                        "waiting for %d threads to terminate\n",
371                        swi_data.wi_nthreads);
372 #endif
373
374         cfs_spin_unlock(&swi_data.wi_lock);
375         return;
376 }