Whamcloud - gitweb
LU-9399 llite: register mountpoint before process llog
[fs/lustre-release.git] / libcfs / libcfs / libcfs_ptask.c
1 #include <linux/errno.h>
2 #include <linux/kernel.h>
3 #include <linux/module.h>
4 #include <linux/cpumask.h>
5 #include <linux/cpu.h>
6 #include <linux/slab.h>
7 #include <linux/sched.h>
8 #include <linux/moduleparam.h>
9 #include <linux/mmu_context.h>
10
11 #define DEBUG_SUBSYSTEM S_UNDEFINED
12
13 #include <libcfs/libcfs.h>
14 #include <libcfs/libcfs_ptask.h>
15
16 /**
17  * This API based on Linux kernel padada API which is used to perform
18  * encryption and decryption on large numbers of packets without
19  * reordering those packets.
20  *
21  * It was adopted for general use in Lustre for parallelization of
22  * various functionality.
23  *
24  * The first step in using it is to set up a cfs_ptask structure to
25  * control of how this task are to be run:
26  *
27  * #include <libcfs/libcfs_ptask.h>
28  *
29  * int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
30  *                    void *cbdata, unsigned int flags, int cpu);
31  *
32  * The cbfunc function with cbdata argument will be called in the process
33  * of getting the task done. The cpu specifies which CPU will be used for
34  * the final callback when the task is done.
35  *
36  * The submission of task is done with:
37  *
38  * int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine);
39  *
40  * The task is submitted to the engine for execution.
41  *
42  * In order to wait for result of task execution you should call:
43  *
44  * int cfs_ptask_wait_for(struct cfs_ptask *ptask);
45  *
46  * The tasks with flag PTF_ORDERED are executed in parallel but complete
47  * into submission order. So, waiting for last ordered task you can be sure
48  * that all previous tasks were done before this task complete.
49  */
50
51 #ifndef HAVE_REINIT_COMPLETION
52 /**
53  * reinit_completion - reinitialize a completion structure
54  * @x:  pointer to completion structure that is to be reinitialized
55  *
56  * This inline function should be used to reinitialize a completion
57  * structure so it can be reused. This is especially important after
58  * complete_all() is used.
59  */
60 static inline void reinit_completion(struct completion *x)
61 {
62         x->done = 0;
63 }
64 #endif
65
66 #ifndef HAVE_CPUMASK_PRINT_TO_PAGEBUF
67 static inline void cpumap_print_to_pagebuf(bool unused, char *buf,
68                                            const struct cpumask *mask)
69 {
70         cpulist_scnprintf(buf, PAGE_SIZE, mask);
71 }
72 #endif
73
74 #ifdef CONFIG_PADATA
75 static void cfs_ptask_complete(struct padata_priv *padata)
76 {
77         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
78
79         if (cfs_ptask_need_complete(ptask)) {
80                 if (cfs_ptask_is_ordered(ptask))
81                         complete(&ptask->pt_completion);
82         } else if (cfs_ptask_is_autofree(ptask)) {
83                 kfree(ptask);
84         }
85 }
86
87 static void cfs_ptask_execute(struct padata_priv *padata)
88 {
89         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
90         mm_segment_t old_fs = get_fs();
91         bool bh_enabled = false;
92
93         if (!cfs_ptask_is_atomic(ptask)) {
94                 local_bh_enable();
95                 bh_enabled = true;
96         }
97
98         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
99                 use_mm(ptask->pt_mm);
100                 set_fs(ptask->pt_fs);
101         }
102
103         if (ptask->pt_cbfunc != NULL)
104                 ptask->pt_result = ptask->pt_cbfunc(ptask);
105         else
106                 ptask->pt_result = -ENOSYS;
107
108         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
109                 set_fs(old_fs);
110                 unuse_mm(ptask->pt_mm);
111                 mmput(ptask->pt_mm);
112                 ptask->pt_mm = NULL;
113         }
114
115         if (cfs_ptask_need_complete(ptask) && !cfs_ptask_is_ordered(ptask))
116                 complete(&ptask->pt_completion);
117
118         if (bh_enabled)
119                 local_bh_disable();
120
121         padata_do_serial(padata);
122 }
123
124 static int cfs_do_parallel(struct cfs_ptask_engine *engine,
125                            struct padata_priv *padata)
126 {
127         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
128         int rc;
129
130         if (cfs_ptask_need_complete(ptask))
131                 reinit_completion(&ptask->pt_completion);
132
133         if (cfs_ptask_use_user_mm(ptask)) {
134                 ptask->pt_mm = get_task_mm(current);
135                 ptask->pt_fs = get_fs();
136         }
137         ptask->pt_result = -EINPROGRESS;
138
139 retry:
140         rc = padata_do_parallel(engine->pte_pinst, padata, ptask->pt_cbcpu);
141         if (rc == -EBUSY && cfs_ptask_is_retry(ptask)) {
142                 /* too many tasks already in queue */
143                 schedule_timeout_uninterruptible(1);
144                 goto retry;
145         }
146
147         if (rc) {
148                 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
149                         mmput(ptask->pt_mm);
150                         ptask->pt_mm = NULL;
151                 }
152                 ptask->pt_result = rc;
153         }
154
155         return rc;
156 }
157
158 /**
159  * This function submit initialized task for async execution
160  * in engine with specified id.
161  */
162 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
163 {
164         struct padata_priv *padata = cfs_ptask2padata(ptask);
165
166         if (IS_ERR_OR_NULL(engine))
167                 return -EINVAL;
168
169         memset(padata, 0, sizeof(*padata));
170
171         padata->parallel = cfs_ptask_execute;
172         padata->serial   = cfs_ptask_complete;
173
174         return cfs_do_parallel(engine, padata);
175 }
176
177 #else  /* !CONFIG_PADATA */
178
179 /**
180  * If CONFIG_PADATA is not defined this function just execute
181  * the initialized task in current thread. (emulate async execution)
182  */
183 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
184 {
185         if (IS_ERR_OR_NULL(engine))
186                 return -EINVAL;
187
188         if (ptask->pt_cbfunc != NULL)
189                 ptask->pt_result = ptask->pt_cbfunc(ptask);
190         else
191                 ptask->pt_result = -ENOSYS;
192
193         if (cfs_ptask_need_complete(ptask))
194                 complete(&ptask->pt_completion);
195         else if (cfs_ptask_is_autofree(ptask))
196                 kfree(ptask);
197
198         return 0;
199 }
200 #endif /* CONFIG_PADATA */
201
202 EXPORT_SYMBOL(cfs_ptask_submit);
203
204 /**
205  * This function waits when task complete async execution.
206  * The tasks with flag PTF_ORDERED are executed in parallel but completes
207  * into submission order. So, waiting for last ordered task you can be sure
208  * that all previous tasks were done before this task complete.
209  */
210 int cfs_ptask_wait_for(struct cfs_ptask *ptask)
211 {
212         if (!cfs_ptask_need_complete(ptask))
213                 return -EINVAL;
214
215         wait_for_completion(&ptask->pt_completion);
216
217         return 0;
218 }
219 EXPORT_SYMBOL(cfs_ptask_wait_for);
220
221 /**
222  * This function initialize internal members of task and prepare it for
223  * async execution.
224  */
225 int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc, void *cbdata,
226                    unsigned int flags, int cpu)
227 {
228         memset(ptask, 0, sizeof(*ptask));
229
230         ptask->pt_flags  = flags;
231         ptask->pt_cbcpu  = cpu;
232         ptask->pt_mm     = NULL; /* will be set in cfs_do_parallel() */
233         ptask->pt_fs     = get_fs();
234         ptask->pt_cbfunc = cbfunc;
235         ptask->pt_cbdata = cbdata;
236         ptask->pt_result = -EAGAIN;
237
238         if (cfs_ptask_need_complete(ptask)) {
239                 if (cfs_ptask_is_autofree(ptask))
240                         return -EINVAL;
241
242                 init_completion(&ptask->pt_completion);
243         }
244
245         if (cfs_ptask_is_atomic(ptask) && cfs_ptask_use_user_mm(ptask))
246                 return -EINVAL;
247
248         return 0;
249 }
250 EXPORT_SYMBOL(cfs_ptask_init);
251
252 /**
253  * This function set the mask of allowed CPUs for parallel execution
254  * for engine with specified id.
255  */
256 int cfs_ptengine_set_cpumask(struct cfs_ptask_engine *engine,
257                              const struct cpumask *cpumask)
258 {
259         int rc = 0;
260
261 #ifdef CONFIG_PADATA
262         cpumask_var_t serial_mask;
263         cpumask_var_t parallel_mask;
264
265         if (IS_ERR_OR_NULL(engine))
266                 return -EINVAL;
267
268         if (!alloc_cpumask_var(&serial_mask, GFP_KERNEL))
269                 return -ENOMEM;
270
271         if (!alloc_cpumask_var(&parallel_mask, GFP_KERNEL)) {
272                 free_cpumask_var(serial_mask);
273                 return -ENOMEM;
274         }
275
276         cpumask_copy(parallel_mask, cpumask);
277         cpumask_copy(serial_mask, cpu_online_mask);
278
279         rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_PARALLEL,
280                                 parallel_mask);
281         free_cpumask_var(parallel_mask);
282         if (rc)
283                 goto out_failed_mask;
284
285         rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_SERIAL,
286                                 serial_mask);
287 out_failed_mask:
288         free_cpumask_var(serial_mask);
289 #endif /* CONFIG_PADATA */
290
291         return rc;
292 }
293 EXPORT_SYMBOL(cfs_ptengine_set_cpumask);
294
295 /**
296  * This function returns the count of allowed CPUs for parallel execution
297  * for engine with specified id.
298  */
299 int cfs_ptengine_weight(struct cfs_ptask_engine *engine)
300 {
301         if (IS_ERR_OR_NULL(engine))
302                 return -EINVAL;
303
304         return engine->pte_weight;
305 }
306 EXPORT_SYMBOL(cfs_ptengine_weight);
307
308 #ifdef CONFIG_PADATA
309 static int cfs_ptask_cpumask_change_notify(struct notifier_block *self,
310                                            unsigned long val, void *data)
311 {
312         struct padata_cpumask *padata_cpumask = data;
313         struct cfs_ptask_engine *engine;
314
315         engine = container_of(self, struct cfs_ptask_engine, pte_notifier);
316
317         if (val & PADATA_CPU_PARALLEL)
318                 engine->pte_weight = cpumask_weight(padata_cpumask->pcpu);
319
320         return 0;
321 }
322
323 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
324                                     const char *name,
325                                     const struct cpumask *cpumask)
326 {
327         cpumask_var_t all_mask;
328         cpumask_var_t par_mask;
329         unsigned int wq_flags = WQ_MEM_RECLAIM | WQ_CPU_INTENSIVE;
330         int rc;
331
332         get_online_cpus();
333
334         engine->pte_wq = alloc_workqueue(name, wq_flags, 1);
335         if (engine->pte_wq == NULL)
336                 GOTO(err, rc = -ENOMEM);
337
338         if (!alloc_cpumask_var(&all_mask, GFP_KERNEL))
339                 GOTO(err_destroy_workqueue, rc = -ENOMEM);
340
341         if (!alloc_cpumask_var(&par_mask, GFP_KERNEL))
342                 GOTO(err_free_all_mask, rc = -ENOMEM);
343
344         cpumask_copy(par_mask, cpumask);
345         if (cpumask_empty(par_mask) ||
346             cpumask_equal(par_mask, cpu_online_mask)) {
347                 cpumask_copy(all_mask, cpu_online_mask);
348                 cpumask_clear(par_mask);
349                 while (!cpumask_empty(all_mask)) {
350                         int cpu = cpumask_first(all_mask);
351
352                         cpumask_set_cpu(cpu, par_mask);
353                         cpumask_andnot(all_mask, all_mask,
354                                         topology_sibling_cpumask(cpu));
355                 }
356         }
357
358         cpumask_copy(all_mask, cpu_online_mask);
359
360         {
361                 char *pa_mask_buff, *cb_mask_buff;
362
363                 pa_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
364                 if (pa_mask_buff == NULL)
365                         GOTO(err_free_par_mask, rc = -ENOMEM);
366
367                 cb_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
368                 if (cb_mask_buff == NULL) {
369                         free_page((unsigned long)pa_mask_buff);
370                         GOTO(err_free_par_mask, rc = -ENOMEM);
371                 }
372
373                 cpumap_print_to_pagebuf(true, pa_mask_buff, par_mask);
374                 pa_mask_buff[PAGE_SIZE - 1] = '\0';
375                 cpumap_print_to_pagebuf(true, cb_mask_buff, all_mask);
376                 cb_mask_buff[PAGE_SIZE - 1] = '\0';
377
378                 CDEBUG(D_INFO, "%s weight=%u plist='%s' cblist='%s'\n",
379                         name, cpumask_weight(par_mask),
380                         pa_mask_buff, cb_mask_buff);
381
382                 free_page((unsigned long)cb_mask_buff);
383                 free_page((unsigned long)pa_mask_buff);
384         }
385
386         engine->pte_weight = cpumask_weight(par_mask);
387         engine->pte_pinst  = padata_alloc_possible(engine->pte_wq);
388         if (engine->pte_pinst == NULL)
389                 GOTO(err_free_par_mask, rc = -ENOMEM);
390
391         engine->pte_notifier.notifier_call = cfs_ptask_cpumask_change_notify;
392         rc = padata_register_cpumask_notifier(engine->pte_pinst,
393                                               &engine->pte_notifier);
394         if (rc)
395                 GOTO(err_free_padata, rc);
396
397         rc = cfs_ptengine_set_cpumask(engine, par_mask);
398         if (rc)
399                 GOTO(err_unregister, rc);
400
401         rc = padata_start(engine->pte_pinst);
402         if (rc)
403                 GOTO(err_unregister, rc);
404
405         free_cpumask_var(par_mask);
406         free_cpumask_var(all_mask);
407
408         put_online_cpus();
409         return 0;
410
411 err_unregister:
412         padata_unregister_cpumask_notifier(engine->pte_pinst,
413                                            &engine->pte_notifier);
414 err_free_padata:
415         padata_free(engine->pte_pinst);
416 err_free_par_mask:
417         free_cpumask_var(par_mask);
418 err_free_all_mask:
419         free_cpumask_var(all_mask);
420 err_destroy_workqueue:
421         destroy_workqueue(engine->pte_wq);
422 err:
423         put_online_cpus();
424         return rc;
425 }
426
427 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
428 {
429         padata_stop(engine->pte_pinst);
430         padata_unregister_cpumask_notifier(engine->pte_pinst,
431                                            &engine->pte_notifier);
432         padata_free(engine->pte_pinst);
433         destroy_workqueue(engine->pte_wq);
434 }
435
436 #else  /* !CONFIG_PADATA */
437
438 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
439                                     const char *name,
440                                     const struct cpumask *cpumask)
441 {
442         engine->pte_weight = 1;
443
444         return 0;
445 }
446
447 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
448 {
449 }
450 #endif /* CONFIG_PADATA */
451
452 struct cfs_ptask_engine *cfs_ptengine_init(const char *name,
453                                            const struct cpumask *cpumask)
454 {
455         struct cfs_ptask_engine *engine;
456         int rc;
457
458         engine = kzalloc(sizeof(*engine), GFP_KERNEL);
459         if (engine == NULL)
460                 GOTO(err, rc = -ENOMEM);
461
462         rc = cfs_ptengine_padata_init(engine, name, cpumask);
463         if (rc)
464                 GOTO(err_free_engine, rc);
465
466         return engine;
467
468 err_free_engine:
469         kfree(engine);
470 err:
471         return ERR_PTR(rc);
472 }
473 EXPORT_SYMBOL(cfs_ptengine_init);
474
475 void cfs_ptengine_fini(struct cfs_ptask_engine *engine)
476 {
477         if (IS_ERR_OR_NULL(engine))
478                 return;
479
480         cfs_ptengine_padata_fini(engine);
481         kfree(engine);
482 }
483 EXPORT_SYMBOL(cfs_ptengine_fini);