Whamcloud - gitweb
i=liang:
[fs/lustre-release.git] / lnet / selftest / workitem.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Isaac Huang <isaac@clusterfs.com>
6  *
7  */
8 #define DEBUG_SUBSYSTEM S_LNET
9
10 #include "selftest.h"
11
12
13 struct smoketest_workitem {
14         struct list_head wi_runq;         /* concurrent workitems */
15         struct list_head wi_serial_runq;  /* serialised workitems */
16         cfs_waitq_t      wi_waitq;        /* where schedulers sleep */
17         cfs_waitq_t      wi_serial_waitq; /* where serial scheduler sleep */
18         spinlock_t       wi_lock;         /* serialize */
19         int              wi_shuttingdown;
20         int              wi_nthreads;
21 } swi_data;
22
23 static inline int
24 swi_sched_cansleep (struct list_head *q)
25 {
26         int rc;
27
28         spin_lock(&swi_data.wi_lock);
29
30         rc = !swi_data.wi_shuttingdown && list_empty(q);
31
32         spin_unlock(&swi_data.wi_lock);
33         return rc;
34 }
35
36 /* XXX: 
37  * 0. it only works when called from wi->wi_action.
38  * 1. when it returns no one shall try to schedule the workitem.
39  */
40 void
41 swi_kill_workitem (swi_workitem_t *wi)
42 {
43         LASSERT (!in_interrupt()); /* because we use plain spinlock */
44         LASSERT (!swi_data.wi_shuttingdown);
45
46         spin_lock(&swi_data.wi_lock);
47
48 #ifdef __KERNEL__
49         LASSERT (wi->wi_running);
50 #endif
51
52         if (wi->wi_scheduled) { /* cancel pending schedules */
53                 LASSERT (!list_empty(&wi->wi_list));
54                 list_del_init(&wi->wi_list);
55         }
56
57         LASSERT (list_empty(&wi->wi_list));
58         wi->wi_scheduled = 1; /* LBUG future schedule attempts */
59
60         spin_unlock(&swi_data.wi_lock);
61         return;
62 }
63
64 void
65 swi_schedule_workitem (swi_workitem_t *wi)
66 {
67         LASSERT (!in_interrupt()); /* because we use plain spinlock */
68         LASSERT (!swi_data.wi_shuttingdown);
69
70         spin_lock(&swi_data.wi_lock);
71
72         if (!wi->wi_scheduled) {
73                 LASSERT (list_empty(&wi->wi_list));
74
75                 wi->wi_scheduled = 1;
76                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
77                 cfs_waitq_signal(&swi_data.wi_waitq);
78         }
79
80         LASSERT (!list_empty(&wi->wi_list));
81         spin_unlock(&swi_data.wi_lock);
82         return;
83 }
84
85 /*
86  * Workitem scheduled by this function is strictly serialised not only with
87  * itself, but also with others scheduled this way.
88  *
89  * Now there's only one static serialised queue, but in the future more might
90  * be added, and even dynamic creation of serialised queues might be supported.
91  */
92 void
93 swi_schedule_serial_workitem (swi_workitem_t *wi)
94 {
95         LASSERT (!in_interrupt()); /* because we use plain spinlock */
96         LASSERT (!swi_data.wi_shuttingdown);
97
98         spin_lock(&swi_data.wi_lock);
99
100         if (!wi->wi_scheduled) {
101                 LASSERT (list_empty(&wi->wi_list));
102
103                 wi->wi_scheduled = 1;
104                 list_add_tail(&wi->wi_list, &swi_data.wi_serial_runq);
105                 cfs_waitq_signal(&swi_data.wi_serial_waitq);
106         }
107
108         LASSERT (!list_empty(&wi->wi_list));
109         spin_unlock(&swi_data.wi_lock);
110         return;
111 }
112
113 #ifdef __KERNEL__
114
115 int
116 swi_scheduler_main (void *arg)
117 {
118         int  id = (long) arg;
119         char name[16];
120
121         snprintf(name, sizeof(name), "swi_sd%03d", id);
122         cfs_daemonize(name);
123         cfs_block_allsigs();
124
125         spin_lock(&swi_data.wi_lock);
126
127         while (!swi_data.wi_shuttingdown) {
128                 int             nloops = 0;
129                 int             rc;
130                 swi_workitem_t *wi;
131
132                 while (!list_empty(&swi_data.wi_runq) && 
133                        nloops < SWI_RESCHED) {
134                         wi = list_entry(swi_data.wi_runq.next,
135                                         swi_workitem_t, wi_list);
136                         list_del_init(&wi->wi_list);
137
138                         LASSERT (wi->wi_scheduled);
139
140                         nloops++;
141                         if (wi->wi_running) {
142                                 list_add_tail(&wi->wi_list, &swi_data.wi_runq);
143                                 continue;
144                         }
145
146                         wi->wi_running   = 1;
147                         wi->wi_scheduled = 0;
148                         spin_unlock(&swi_data.wi_lock);
149
150                         rc = (*wi->wi_action) (wi);
151
152                         spin_lock(&swi_data.wi_lock);
153                         if (rc == 0) /* wi still active */
154                                 wi->wi_running = 0;
155                 }
156
157                 spin_unlock(&swi_data.wi_lock);
158
159                 if (nloops < SWI_RESCHED)
160                         wait_event_interruptible_exclusive(
161                                    swi_data.wi_waitq,
162                                    !swi_sched_cansleep(&swi_data.wi_runq));
163                 else
164                         our_cond_resched();
165
166                 spin_lock(&swi_data.wi_lock);
167         }
168
169         swi_data.wi_nthreads--;
170         spin_unlock(&swi_data.wi_lock);
171         return 0;
172 }
173
174 int
175 swi_serial_scheduler_main (void *arg)
176 {
177         UNUSED (arg);
178
179         cfs_daemonize("swi_serial_sd");
180         cfs_block_allsigs();
181
182         spin_lock(&swi_data.wi_lock);
183
184         while (!swi_data.wi_shuttingdown) {
185                 int             nloops = 0;
186                 int             rc;
187                 swi_workitem_t *wi;
188
189                 while (!list_empty(&swi_data.wi_serial_runq) && 
190                        nloops < SWI_RESCHED) {
191                         wi = list_entry(swi_data.wi_serial_runq.next,
192                                         swi_workitem_t, wi_list);
193                         list_del_init(&wi->wi_list);
194
195                         LASSERT (!wi->wi_running);
196                         LASSERT (wi->wi_scheduled);
197
198                         nloops++;
199                         wi->wi_running   = 1;
200                         wi->wi_scheduled = 0;
201                         spin_unlock(&swi_data.wi_lock);
202
203                         rc = (*wi->wi_action) (wi);
204
205                         spin_lock(&swi_data.wi_lock);
206                         if (rc == 0) /* wi still active */
207                                 wi->wi_running = 0;
208                 }
209
210                 spin_unlock(&swi_data.wi_lock);
211
212                 if (nloops < SWI_RESCHED)
213                         wait_event_interruptible_exclusive(
214                              swi_data.wi_serial_waitq, 
215                              !swi_sched_cansleep(&swi_data.wi_serial_runq));
216                 else
217                         our_cond_resched();
218
219                 spin_lock(&swi_data.wi_lock);
220         }
221
222         swi_data.wi_nthreads--;
223         spin_unlock(&swi_data.wi_lock);
224         return 0;
225 }
226
227 int
228 swi_start_thread (int (*func) (void*), void *arg)
229 {
230         long pid;
231
232         LASSERT (!swi_data.wi_shuttingdown);
233
234         pid = cfs_kernel_thread(func, arg, 0);
235         if (pid < 0)
236                 return (int)pid;
237
238         spin_lock(&swi_data.wi_lock);
239         swi_data.wi_nthreads++;
240         spin_unlock(&swi_data.wi_lock);
241         return 0;
242 }
243
244 #else /* __KERNEL__ */
245
246 int
247 swi_check_events (void)
248 {
249         int               n = 0;
250         swi_workitem_t   *wi;
251         struct list_head *q;
252
253         spin_lock(&swi_data.wi_lock);
254
255         for (;;) {
256                 if (!list_empty(&swi_data.wi_serial_runq))
257                         q = &swi_data.wi_serial_runq;
258                 else if (!list_empty(&swi_data.wi_runq))
259                         q = &swi_data.wi_runq;
260                 else
261                         break;
262                                
263                 wi = list_entry(q->next, swi_workitem_t, wi_list);
264                 list_del_init(&wi->wi_list);
265
266                 LASSERT (wi->wi_scheduled);
267                 wi->wi_scheduled = 0;
268                 spin_unlock(&swi_data.wi_lock);
269
270                 n++;
271                 (*wi->wi_action) (wi);
272
273                 spin_lock(&swi_data.wi_lock);
274         }
275
276         spin_unlock(&swi_data.wi_lock);
277         return n;
278 }
279
280 #endif
281
282 int
283 swi_startup (void)
284 {
285         int i;
286         int rc;
287
288         swi_data.wi_nthreads = 0;
289         swi_data.wi_shuttingdown = 0;
290         spin_lock_init(&swi_data.wi_lock);
291         cfs_waitq_init(&swi_data.wi_waitq);
292         cfs_waitq_init(&swi_data.wi_serial_waitq);
293         CFS_INIT_LIST_HEAD(&swi_data.wi_runq);
294         CFS_INIT_LIST_HEAD(&swi_data.wi_serial_runq);
295
296 #ifdef __KERNEL__
297         rc = swi_start_thread(swi_serial_scheduler_main, NULL);
298         if (rc != 0) {
299                 LASSERT (swi_data.wi_nthreads == 0);
300                 CERROR ("Can't spawn serial workitem scheduler: %d\n", rc);
301                 return rc;
302         }
303
304         for (i = 0; i < num_online_cpus(); i++) {
305                 rc = swi_start_thread(swi_scheduler_main, (void *) (long) i);
306                 if (rc != 0) {
307                         CERROR ("Can't spawn workitem scheduler: %d\n", rc);
308                         swi_shutdown();
309                         return rc;
310                 }
311         }
312 #else
313         UNUSED(i);
314         UNUSED(rc);
315 #endif
316
317         return 0;
318 }
319
320 void
321 swi_shutdown (void)
322 {
323         spin_lock(&swi_data.wi_lock);
324
325         LASSERT (list_empty(&swi_data.wi_runq));
326         LASSERT (list_empty(&swi_data.wi_serial_runq));
327
328         swi_data.wi_shuttingdown = 1;
329
330 #ifdef __KERNEL__
331         cfs_waitq_broadcast(&swi_data.wi_waitq);
332         cfs_waitq_broadcast(&swi_data.wi_serial_waitq);
333         lst_wait_until(swi_data.wi_nthreads == 0, swi_data.wi_lock,
334                        "waiting for %d threads to terminate\n",
335                        swi_data.wi_nthreads);
336 #endif
337
338         spin_unlock(&swi_data.wi_lock);
339         return;
340 }