Whamcloud - gitweb
i=liangzhen,b=18075:
[fs/lustre-release.git] / lnet / selftest / workitem.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/selftest/workitem.c
37  *
38  * Author: Isaac Huang <isaac@clusterfs.com>
39  */
40 #define DEBUG_SUBSYSTEM S_LNET
41
42 #include "selftest.h"
43
44
45 struct smoketest_workitem {
46         struct list_head wi_runq;         /* concurrent workitems */
47         struct list_head wi_serial_runq;  /* serialised workitems */
48         cfs_waitq_t      wi_waitq;        /* where schedulers sleep */
49         cfs_waitq_t      wi_serial_waitq; /* where serial scheduler sleep */
50         spinlock_t       wi_lock;         /* serialize */
51         int              wi_shuttingdown;
52         int              wi_nthreads;
53 } swi_data;
54
55 static inline int
56 swi_sched_cansleep (struct list_head *q)
57 {
58         int rc;
59
60         spin_lock(&swi_data.wi_lock);
61
62         rc = !swi_data.wi_shuttingdown && list_empty(q);
63
64         spin_unlock(&swi_data.wi_lock);
65         return rc;
66 }
67
68 /* XXX: 
69  * 0. it only works when called from wi->wi_action.
70  * 1. when it returns no one shall try to schedule the workitem.
71  */
72 void
73 swi_kill_workitem (swi_workitem_t *wi)
74 {
75         LASSERT (!in_interrupt()); /* because we use plain spinlock */
76         LASSERT (!swi_data.wi_shuttingdown);
77
78         spin_lock(&swi_data.wi_lock);
79
80 #ifdef __KERNEL__
81         LASSERT (wi->wi_running);
82 #endif
83
84         if (wi->wi_scheduled) { /* cancel pending schedules */
85                 LASSERT (!list_empty(&wi->wi_list));
86                 list_del_init(&wi->wi_list);
87         }
88
89         LASSERT (list_empty(&wi->wi_list));
90         wi->wi_scheduled = 1; /* LBUG future schedule attempts */
91
92         spin_unlock(&swi_data.wi_lock);
93         return;
94 }
95
96 void
97 swi_schedule_workitem (swi_workitem_t *wi)
98 {
99         LASSERT (!in_interrupt()); /* because we use plain spinlock */
100         LASSERT (!swi_data.wi_shuttingdown);
101
102         spin_lock(&swi_data.wi_lock);
103
104         if (!wi->wi_scheduled) {
105                 LASSERT (list_empty(&wi->wi_list));
106
107                 wi->wi_scheduled = 1;
108                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
109                 cfs_waitq_signal(&swi_data.wi_waitq);
110         }
111
112         LASSERT (!list_empty(&wi->wi_list));
113         spin_unlock(&swi_data.wi_lock);
114         return;
115 }
116
117 /*
118  * Workitem scheduled by this function is strictly serialised not only with
119  * itself, but also with others scheduled this way.
120  *
121  * Now there's only one static serialised queue, but in the future more might
122  * be added, and even dynamic creation of serialised queues might be supported.
123  */
124 void
125 swi_schedule_serial_workitem (swi_workitem_t *wi)
126 {
127         LASSERT (!in_interrupt()); /* because we use plain spinlock */
128         LASSERT (!swi_data.wi_shuttingdown);
129
130         spin_lock(&swi_data.wi_lock);
131
132         if (!wi->wi_scheduled) {
133                 LASSERT (list_empty(&wi->wi_list));
134
135                 wi->wi_scheduled = 1;
136                 list_add_tail(&wi->wi_list, &swi_data.wi_serial_runq);
137                 cfs_waitq_signal(&swi_data.wi_serial_waitq);
138         }
139
140         LASSERT (!list_empty(&wi->wi_list));
141         spin_unlock(&swi_data.wi_lock);
142         return;
143 }
144
145 #ifdef __KERNEL__
146
147 int
148 swi_scheduler_main (void *arg)
149 {
150         int  id = (int)(long_ptr_t) arg;
151         char name[16];
152
153         snprintf(name, sizeof(name), "swi_sd%03d", id);
154         cfs_daemonize(name);
155         cfs_block_allsigs();
156
157         spin_lock(&swi_data.wi_lock);
158
159         while (!swi_data.wi_shuttingdown) {
160                 int             nloops = 0;
161                 int             rc;
162                 swi_workitem_t *wi;
163
164                 while (!list_empty(&swi_data.wi_runq) && 
165                        nloops < SWI_RESCHED) {
166                         wi = list_entry(swi_data.wi_runq.next,
167                                         swi_workitem_t, wi_list);
168                         list_del_init(&wi->wi_list);
169
170                         LASSERT (wi->wi_scheduled);
171
172                         nloops++;
173                         if (wi->wi_running) {
174                                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
175                                 continue;
176                         }
177
178                         wi->wi_running   = 1;
179                         wi->wi_scheduled = 0;
180                         spin_unlock(&swi_data.wi_lock);
181
182                         rc = (*wi->wi_action) (wi);
183
184                         spin_lock(&swi_data.wi_lock);
185                         if (rc == 0) /* wi still active */
186                                 wi->wi_running = 0;
187                 }
188
189                 spin_unlock(&swi_data.wi_lock);
190
191                 if (nloops < SWI_RESCHED)
192                         cfs_wait_event_interruptible_exclusive(
193                                    swi_data.wi_waitq,
194                                    !swi_sched_cansleep(&swi_data.wi_runq), rc);
195                 else
196                         our_cond_resched();
197
198                 spin_lock(&swi_data.wi_lock);
199         }
200
201         swi_data.wi_nthreads--;
202         spin_unlock(&swi_data.wi_lock);
203         return 0;
204 }
205
206 int
207 swi_serial_scheduler_main (void *arg)
208 {
209         UNUSED (arg);
210
211         cfs_daemonize("swi_serial_sd");
212         cfs_block_allsigs();
213
214         spin_lock(&swi_data.wi_lock);
215
216         while (!swi_data.wi_shuttingdown) {
217                 int             nloops = 0;
218                 int             rc;
219                 swi_workitem_t *wi;
220
221                 while (!list_empty(&swi_data.wi_serial_runq) &&
222                        nloops < SWI_RESCHED) {
223                         wi = list_entry(swi_data.wi_serial_runq.next,
224                                         swi_workitem_t, wi_list);
225                         list_del_init(&wi->wi_list);
226
227                         LASSERTF (!wi->wi_running && wi->wi_scheduled,
228                                   "wi %p running %d scheduled %d\n",
229                                   wi, wi->wi_running, wi->wi_scheduled);
230
231                         nloops++;
232                         wi->wi_running   = 1;
233                         wi->wi_scheduled = 0;
234                         spin_unlock(&swi_data.wi_lock);
235
236                         rc = (*wi->wi_action) (wi);
237
238                         spin_lock(&swi_data.wi_lock);
239                         if (rc == 0) /* wi still active */
240                                 wi->wi_running = 0;
241                 }
242
243                 spin_unlock(&swi_data.wi_lock);
244
245                 if (nloops < SWI_RESCHED)
246                         cfs_wait_event_interruptible_exclusive(
247                              swi_data.wi_serial_waitq,
248                              !swi_sched_cansleep(&swi_data.wi_serial_runq), rc);
249                 else
250                         our_cond_resched();
251
252                 spin_lock(&swi_data.wi_lock);
253         }
254
255         swi_data.wi_nthreads--;
256         spin_unlock(&swi_data.wi_lock);
257         return 0;
258 }
259
260 int
261 swi_start_thread (int (*func) (void*), void *arg)
262 {
263         long pid;
264
265         LASSERT (!swi_data.wi_shuttingdown);
266
267         pid = cfs_kernel_thread(func, arg, 0);
268         if (pid < 0)
269                 return (int)pid;
270
271         spin_lock(&swi_data.wi_lock);
272         swi_data.wi_nthreads++;
273         spin_unlock(&swi_data.wi_lock);
274         return 0;
275 }
276
277 #else /* __KERNEL__ */
278
279 int
280 swi_check_events (void)
281 {
282         int               n = 0;
283         swi_workitem_t   *wi;
284         struct list_head *q;
285
286         spin_lock(&swi_data.wi_lock);
287
288         for (;;) {
289                 if (!list_empty(&swi_data.wi_serial_runq))
290                         q = &swi_data.wi_serial_runq;
291                 else if (!list_empty(&swi_data.wi_runq))
292                         q = &swi_data.wi_runq;
293                 else
294                         break;
295
296                 wi = list_entry(q->next, swi_workitem_t, wi_list);
297                 list_del_init(&wi->wi_list);
298
299                 LASSERT (wi->wi_scheduled);
300                 wi->wi_scheduled = 0;
301                 spin_unlock(&swi_data.wi_lock);
302
303                 n++;
304                 (*wi->wi_action) (wi);
305
306                 spin_lock(&swi_data.wi_lock);
307         }
308
309         spin_unlock(&swi_data.wi_lock);
310         return n;
311 }
312
313 #endif
314
315 int
316 swi_startup (void)
317 {
318         int i;
319         int rc;
320
321         swi_data.wi_nthreads = 0;
322         swi_data.wi_shuttingdown = 0;
323         spin_lock_init(&swi_data.wi_lock);
324         cfs_waitq_init(&swi_data.wi_waitq);
325         cfs_waitq_init(&swi_data.wi_serial_waitq);
326         CFS_INIT_LIST_HEAD(&swi_data.wi_runq);
327         CFS_INIT_LIST_HEAD(&swi_data.wi_serial_runq);
328
329 #ifdef __KERNEL__
330         rc = swi_start_thread(swi_serial_scheduler_main, NULL);
331         if (rc != 0) {
332                 LASSERT (swi_data.wi_nthreads == 0);
333                 CERROR ("Can't spawn serial workitem scheduler: %d\n", rc);
334                 return rc;
335         }
336
337         for (i = 0; i < num_online_cpus(); i++) {
338                 rc = swi_start_thread(swi_scheduler_main,
339                                       (void *) (long_ptr_t) i);
340                 if (rc != 0) {
341                         CERROR ("Can't spawn workitem scheduler: %d\n", rc);
342                         swi_shutdown();
343                         return rc;
344                 }
345         }
346 #else
347         UNUSED(i);
348         UNUSED(rc);
349 #endif
350
351         return 0;
352 }
353
354 void
355 swi_shutdown (void)
356 {
357         spin_lock(&swi_data.wi_lock);
358
359         LASSERT (list_empty(&swi_data.wi_runq));
360         LASSERT (list_empty(&swi_data.wi_serial_runq));
361
362         swi_data.wi_shuttingdown = 1;
363
364 #ifdef __KERNEL__
365         cfs_waitq_broadcast(&swi_data.wi_waitq);
366         cfs_waitq_broadcast(&swi_data.wi_serial_waitq);
367         lst_wait_until(swi_data.wi_nthreads == 0, swi_data.wi_lock,
368                        "waiting for %d threads to terminate\n",
369                        swi_data.wi_nthreads);
370 #endif
371
372         spin_unlock(&swi_data.wi_lock);
373         return;
374 }