Whamcloud - gitweb
i=liangzhen:
[fs/lustre-release.git] / lnet / selftest / workitem.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Isaac Huang <isaac@clusterfs.com>
6  *
7  */
8 #define DEBUG_SUBSYSTEM S_LNET
9
10 #include <libcfs/kp30.h>
11 #include <libcfs/libcfs.h>
12 #include <lnet/lib-lnet.h>
13 #include "selftest.h"
14
15
16 struct smoketest_workitem {
17         struct list_head wi_runq;         /* concurrent workitems */
18         struct list_head wi_serial_runq;  /* serialised workitems */
19         cfs_waitq_t      wi_waitq;        /* where schedulers sleep */
20         cfs_waitq_t      wi_serial_waitq; /* where serial scheduler sleep */
21         spinlock_t       wi_lock;         /* serialize */
22         int              wi_shuttingdown;
23         int              wi_nthreads;
24 } swi_data;
25
26 static inline int
27 swi_sched_cansleep (struct list_head *q)
28 {
29         int rc;
30
31         spin_lock(&swi_data.wi_lock);
32
33         rc = !swi_data.wi_shuttingdown && list_empty(q);
34
35         spin_unlock(&swi_data.wi_lock);
36         return rc;
37 }
38
39 /* XXX: 
40  * 0. it only works when called from wi->wi_action.
41  * 1. when it returns no one shall try to schedule the workitem.
42  */
43 void
44 swi_kill_workitem (swi_workitem_t *wi)
45 {
46         LASSERT (!in_interrupt()); /* because we use plain spinlock */
47         LASSERT (!swi_data.wi_shuttingdown);
48
49         spin_lock(&swi_data.wi_lock);
50
51 #ifdef __KERNEL__
52         LASSERT (wi->wi_running);
53 #endif
54
55         if (wi->wi_scheduled) { /* cancel pending schedules */
56                 LASSERT (!list_empty(&wi->wi_list));
57                 list_del_init(&wi->wi_list);
58         }
59
60         LASSERT (list_empty(&wi->wi_list));
61         wi->wi_scheduled = 1; /* LBUG future schedule attempts */
62
63         spin_unlock(&swi_data.wi_lock);
64         return;
65 }
66
67 void
68 swi_schedule_workitem (swi_workitem_t *wi)
69 {
70         LASSERT (!in_interrupt()); /* because we use plain spinlock */
71         LASSERT (!swi_data.wi_shuttingdown);
72
73         spin_lock(&swi_data.wi_lock);
74
75         if (!wi->wi_scheduled) {
76                 LASSERT (list_empty(&wi->wi_list));
77
78                 wi->wi_scheduled = 1;
79                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
80                 cfs_waitq_signal(&swi_data.wi_waitq);
81         }
82
83         LASSERT (!list_empty(&wi->wi_list));
84         spin_unlock(&swi_data.wi_lock);
85         return;
86 }
87
88 /*
89  * Workitem scheduled by this function is strictly serialised not only with
90  * itself, but also with others scheduled this way.
91  *
92  * Now there's only one static serialised queue, but in the future more might
93  * be added, and even dynamic creation of serialised queues might be supported.
94  */
95 void
96 swi_schedule_serial_workitem (swi_workitem_t *wi)
97 {
98         LASSERT (!in_interrupt()); /* because we use plain spinlock */
99         LASSERT (!swi_data.wi_shuttingdown);
100
101         spin_lock(&swi_data.wi_lock);
102
103         if (!wi->wi_scheduled) {
104                 LASSERT (list_empty(&wi->wi_list));
105
106                 wi->wi_scheduled = 1;
107                 list_add_tail(&wi->wi_list, &swi_data.wi_serial_runq);
108                 cfs_waitq_signal(&swi_data.wi_serial_waitq);
109         }
110
111         LASSERT (!list_empty(&wi->wi_list));
112         spin_unlock(&swi_data.wi_lock);
113         return;
114 }
115
116 #ifdef __KERNEL__
117
118 int
119 swi_scheduler_main (void *arg)
120 {
121         int  id = (long) arg;
122         char name[16];
123
124         snprintf(name, sizeof(name), "swi_sd%03d", id);
125         cfs_daemonize(name);
126         cfs_block_allsigs();
127
128         spin_lock(&swi_data.wi_lock);
129
130         while (!swi_data.wi_shuttingdown) {
131                 int             nloops = 0;
132                 int             rc;
133                 swi_workitem_t *wi;
134
135                 while (!list_empty(&swi_data.wi_runq) && 
136                        nloops < SWI_RESCHED) {
137                         wi = list_entry(swi_data.wi_runq.next,
138                                         swi_workitem_t, wi_list);
139                         list_del_init(&wi->wi_list);
140
141                         LASSERT (wi->wi_scheduled);
142
143                         nloops++;
144                         if (wi->wi_running) {
145                                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
146                                 continue;
147                         }
148
149                         wi->wi_running   = 1;
150                         wi->wi_scheduled = 0;
151                         spin_unlock(&swi_data.wi_lock);
152
153                         rc = (*wi->wi_action) (wi);
154
155                         spin_lock(&swi_data.wi_lock);
156                         if (rc == 0) /* wi still active */
157                                 wi->wi_running = 0;
158                 }
159
160                 spin_unlock(&swi_data.wi_lock);
161
162                 if (nloops < SWI_RESCHED)
163                         wait_event_interruptible_exclusive(
164                                    swi_data.wi_waitq,
165                                    !swi_sched_cansleep(&swi_data.wi_runq));
166                 else
167                         our_cond_resched();
168
169                 spin_lock(&swi_data.wi_lock);
170         }
171
172         swi_data.wi_nthreads--;
173         spin_unlock(&swi_data.wi_lock);
174         return 0;
175 }
176
177 int
178 swi_serial_scheduler_main (void *arg)
179 {
180         UNUSED (arg);
181
182         cfs_daemonize("swi_serial_sd");
183         cfs_block_allsigs();
184
185         spin_lock(&swi_data.wi_lock);
186
187         while (!swi_data.wi_shuttingdown) {
188                 int             nloops = 0;
189                 int             rc;
190                 swi_workitem_t *wi;
191
192                 while (!list_empty(&swi_data.wi_serial_runq) && 
193                        nloops < SWI_RESCHED) {
194                         wi = list_entry(swi_data.wi_serial_runq.next,
195                                         swi_workitem_t, wi_list);
196                         list_del_init(&wi->wi_list);
197
198                         LASSERT (!wi->wi_running);
199                         LASSERT (wi->wi_scheduled);
200
201                         nloops++;
202                         wi->wi_running   = 1;
203                         wi->wi_scheduled = 0;
204                         spin_unlock(&swi_data.wi_lock);
205
206                         rc = (*wi->wi_action) (wi);
207
208                         spin_lock(&swi_data.wi_lock);
209                         if (rc == 0) /* wi still active */
210                                 wi->wi_running = 0;
211                 }
212
213                 spin_unlock(&swi_data.wi_lock);
214
215                 if (nloops < SWI_RESCHED)
216                         wait_event_interruptible_exclusive(
217                              swi_data.wi_serial_waitq, 
218                              !swi_sched_cansleep(&swi_data.wi_serial_runq));
219                 else
220                         our_cond_resched();
221
222                 spin_lock(&swi_data.wi_lock);
223         }
224
225         swi_data.wi_nthreads--;
226         spin_unlock(&swi_data.wi_lock);
227         return 0;
228 }
229
230 int
231 swi_start_thread (int (*func) (void*), void *arg)
232 {
233         long pid;
234
235         LASSERT (!swi_data.wi_shuttingdown);
236
237         pid = cfs_kernel_thread(func, arg, 0);
238         if (pid < 0)
239                 return (int)pid;
240
241         spin_lock(&swi_data.wi_lock);
242         swi_data.wi_nthreads++;
243         spin_unlock(&swi_data.wi_lock);
244         return 0;
245 }
246
247 #else /* __KERNEL__ */
248
249 int
250 swi_check_events (void)
251 {
252         int               n = 0;
253         swi_workitem_t   *wi;
254         struct list_head *q;
255
256         spin_lock(&swi_data.wi_lock);
257
258         for (;;) {
259                 if (!list_empty(&swi_data.wi_serial_runq))
260                         q = &swi_data.wi_serial_runq;
261                 else if (!list_empty(&swi_data.wi_runq))
262                         q = &swi_data.wi_runq;
263                 else
264                         break;
265                                
266                 wi = list_entry(q->next, swi_workitem_t, wi_list);
267                 list_del_init(&wi->wi_list);
268
269                 LASSERT (wi->wi_scheduled);
270                 wi->wi_scheduled = 0;
271                 spin_unlock(&swi_data.wi_lock);
272
273                 n++;
274                 (*wi->wi_action) (wi);
275
276                 spin_lock(&swi_data.wi_lock);
277         }
278
279         spin_unlock(&swi_data.wi_lock);
280         return n;
281 }
282
283 #endif
284
285 int
286 swi_startup (void)
287 {
288         int i;
289         int rc;
290
291         swi_data.wi_nthreads = 0;
292         swi_data.wi_shuttingdown = 0;
293         spin_lock_init(&swi_data.wi_lock);
294         cfs_waitq_init(&swi_data.wi_waitq);
295         cfs_waitq_init(&swi_data.wi_serial_waitq);
296         CFS_INIT_LIST_HEAD(&swi_data.wi_runq);
297         CFS_INIT_LIST_HEAD(&swi_data.wi_serial_runq);
298
299 #ifdef __KERNEL__
300         rc = swi_start_thread(swi_serial_scheduler_main, NULL);
301         if (rc != 0) {
302                 LASSERT (swi_data.wi_nthreads == 0);
303                 CERROR ("Can't spawn serial workitem scheduler: %d\n", rc);
304                 return rc;
305         }
306
307         for (i = 0; i < num_online_cpus(); i++) {
308                 rc = swi_start_thread(swi_scheduler_main, (void *) (long) i);
309                 if (rc != 0) {
310                         CERROR ("Can't spawn workitem scheduler: %d\n", rc);
311                         swi_shutdown();
312                         return rc;
313                 }
314         }
315 #else
316         UNUSED(i);
317         UNUSED(rc);
318 #endif
319
320         return 0;
321 }
322
323 void
324 swi_shutdown (void)
325 {
326         spin_lock(&swi_data.wi_lock);
327
328         LASSERT (list_empty(&swi_data.wi_runq));
329         LASSERT (list_empty(&swi_data.wi_serial_runq));
330
331         swi_data.wi_shuttingdown = 1;
332
333 #ifdef __KERNEL__
334         cfs_waitq_broadcast(&swi_data.wi_waitq);
335         cfs_waitq_broadcast(&swi_data.wi_serial_waitq);
336         lst_wait_until(swi_data.wi_nthreads == 0, swi_data.wi_lock,
337                        "waiting for %d threads to terminate\n",
338                        swi_data.wi_nthreads);
339 #endif
340
341         spin_unlock(&swi_data.wi_lock);
342         return;
343 }