Whamcloud - gitweb
LU-1365 tests: createmany outputs stat after 2 seconds
[fs/lustre-release.git] / libcfs / libcfs / libcfs_ptask.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  * Use is subject to license terms.
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  *
29  * parallel task interface
30  */
31 #include <linux/errno.h>
32 #include <linux/kernel.h>
33 #include <linux/module.h>
34 #include <linux/cpumask.h>
35 #include <linux/cpu.h>
36 #include <linux/slab.h>
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #include <linux/sched/mm.h>
41 #endif
42 #include <linux/moduleparam.h>
43 #include <linux/mmu_context.h>
44
45 #define DEBUG_SUBSYSTEM S_UNDEFINED
46
47 #include <libcfs/libcfs.h>
48 #include <libcfs/libcfs_ptask.h>
49
50 /**
51  * This API based on Linux kernel padada API which is used to perform
52  * encryption and decryption on large numbers of packets without
53  * reordering those packets.
54  *
55  * It was adopted for general use in Lustre for parallelization of
56  * various functionality.
57  *
58  * The first step in using it is to set up a cfs_ptask structure to
59  * control of how this task are to be run:
60  *
61  * #include <libcfs/libcfs_ptask.h>
62  *
63  * int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
64  *                    void *cbdata, unsigned int flags, int cpu);
65  *
66  * The cbfunc function with cbdata argument will be called in the process
67  * of getting the task done. The cpu specifies which CPU will be used for
68  * the final callback when the task is done.
69  *
70  * The submission of task is done with:
71  *
72  * int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine);
73  *
74  * The task is submitted to the engine for execution.
75  *
76  * In order to wait for result of task execution you should call:
77  *
78  * int cfs_ptask_wait_for(struct cfs_ptask *ptask);
79  *
80  * The tasks with flag PTF_ORDERED are executed in parallel but complete
81  * into submission order. So, waiting for last ordered task you can be sure
82  * that all previous tasks were done before this task complete.
83  */
84
85 #ifndef HAVE_REINIT_COMPLETION
86 /**
87  * reinit_completion - reinitialize a completion structure
88  * @x:  pointer to completion structure that is to be reinitialized
89  *
90  * This inline function should be used to reinitialize a completion
91  * structure so it can be reused. This is especially important after
92  * complete_all() is used.
93  */
94 static inline void reinit_completion(struct completion *x)
95 {
96         x->done = 0;
97 }
98 #endif
99
100 #ifndef HAVE_CPUMASK_PRINT_TO_PAGEBUF
101 static inline void cpumap_print_to_pagebuf(bool unused, char *buf,
102                                            const struct cpumask *mask)
103 {
104         cpulist_scnprintf(buf, PAGE_SIZE, mask);
105 }
106 #endif
107
108 #ifdef CONFIG_PADATA
109 static void cfs_ptask_complete(struct padata_priv *padata)
110 {
111         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
112
113         if (cfs_ptask_need_complete(ptask)) {
114                 if (cfs_ptask_is_ordered(ptask))
115                         complete(&ptask->pt_completion);
116         } else if (cfs_ptask_is_autofree(ptask)) {
117                 kfree(ptask);
118         }
119 }
120
121 static void cfs_ptask_execute(struct padata_priv *padata)
122 {
123         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
124         mm_segment_t old_fs = get_fs();
125         bool bh_enabled = false;
126
127         if (!cfs_ptask_is_atomic(ptask)) {
128                 local_bh_enable();
129                 bh_enabled = true;
130         }
131
132         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
133                 use_mm(ptask->pt_mm);
134                 set_fs(ptask->pt_fs);
135         }
136
137         if (ptask->pt_cbfunc != NULL)
138                 ptask->pt_result = ptask->pt_cbfunc(ptask);
139         else
140                 ptask->pt_result = -ENOSYS;
141
142         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
143                 set_fs(old_fs);
144                 unuse_mm(ptask->pt_mm);
145                 mmput(ptask->pt_mm);
146                 ptask->pt_mm = NULL;
147         }
148
149         if (cfs_ptask_need_complete(ptask) && !cfs_ptask_is_ordered(ptask))
150                 complete(&ptask->pt_completion);
151
152         if (bh_enabled)
153                 local_bh_disable();
154
155         padata_do_serial(padata);
156 }
157
158 static int cfs_do_parallel(struct cfs_ptask_engine *engine,
159                            struct padata_priv *padata)
160 {
161         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
162         int rc;
163
164         if (cfs_ptask_need_complete(ptask))
165                 reinit_completion(&ptask->pt_completion);
166
167         if (cfs_ptask_use_user_mm(ptask)) {
168                 ptask->pt_mm = get_task_mm(current);
169                 ptask->pt_fs = get_fs();
170         }
171         ptask->pt_result = -EINPROGRESS;
172
173 retry:
174         rc = padata_do_parallel(engine->pte_pinst, padata, ptask->pt_cbcpu);
175         if (rc == -EBUSY && cfs_ptask_is_retry(ptask)) {
176                 /* too many tasks already in queue */
177                 schedule_timeout_uninterruptible(1);
178                 goto retry;
179         }
180
181         if (rc) {
182                 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
183                         mmput(ptask->pt_mm);
184                         ptask->pt_mm = NULL;
185                 }
186                 ptask->pt_result = rc;
187         }
188
189         return rc;
190 }
191
192 /**
193  * This function submit initialized task for async execution
194  * in engine with specified id.
195  */
196 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
197 {
198         struct padata_priv *padata = cfs_ptask2padata(ptask);
199
200         if (IS_ERR_OR_NULL(engine))
201                 return -EINVAL;
202
203         memset(padata, 0, sizeof(*padata));
204
205         padata->parallel = cfs_ptask_execute;
206         padata->serial   = cfs_ptask_complete;
207
208         return cfs_do_parallel(engine, padata);
209 }
210
211 #else  /* !CONFIG_PADATA */
212
213 /**
214  * If CONFIG_PADATA is not defined this function just execute
215  * the initialized task in current thread. (emulate async execution)
216  */
217 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
218 {
219         if (IS_ERR_OR_NULL(engine))
220                 return -EINVAL;
221
222         if (ptask->pt_cbfunc != NULL)
223                 ptask->pt_result = ptask->pt_cbfunc(ptask);
224         else
225                 ptask->pt_result = -ENOSYS;
226
227         if (cfs_ptask_need_complete(ptask))
228                 complete(&ptask->pt_completion);
229         else if (cfs_ptask_is_autofree(ptask))
230                 kfree(ptask);
231
232         return 0;
233 }
234 #endif /* CONFIG_PADATA */
235
236 EXPORT_SYMBOL(cfs_ptask_submit);
237
238 /**
239  * This function waits when task complete async execution.
240  * The tasks with flag PTF_ORDERED are executed in parallel but completes
241  * into submission order. So, waiting for last ordered task you can be sure
242  * that all previous tasks were done before this task complete.
243  */
244 int cfs_ptask_wait_for(struct cfs_ptask *ptask)
245 {
246         if (!cfs_ptask_need_complete(ptask))
247                 return -EINVAL;
248
249         wait_for_completion(&ptask->pt_completion);
250
251         return 0;
252 }
253 EXPORT_SYMBOL(cfs_ptask_wait_for);
254
255 /**
256  * This function initialize internal members of task and prepare it for
257  * async execution.
258  */
259 int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc, void *cbdata,
260                    unsigned int flags, int cpu)
261 {
262         memset(ptask, 0, sizeof(*ptask));
263
264         ptask->pt_flags  = flags;
265         ptask->pt_cbcpu  = cpu;
266         ptask->pt_mm     = NULL; /* will be set in cfs_do_parallel() */
267         ptask->pt_fs     = get_fs();
268         ptask->pt_cbfunc = cbfunc;
269         ptask->pt_cbdata = cbdata;
270         ptask->pt_result = -EAGAIN;
271
272         if (cfs_ptask_need_complete(ptask)) {
273                 if (cfs_ptask_is_autofree(ptask))
274                         return -EINVAL;
275
276                 init_completion(&ptask->pt_completion);
277         }
278
279         if (cfs_ptask_is_atomic(ptask) && cfs_ptask_use_user_mm(ptask))
280                 return -EINVAL;
281
282         return 0;
283 }
284 EXPORT_SYMBOL(cfs_ptask_init);
285
286 /**
287  * This function set the mask of allowed CPUs for parallel execution
288  * for engine with specified id.
289  */
290 int cfs_ptengine_set_cpumask(struct cfs_ptask_engine *engine,
291                              const struct cpumask *cpumask)
292 {
293         int rc = 0;
294
295 #ifdef CONFIG_PADATA
296         cpumask_var_t serial_mask;
297         cpumask_var_t parallel_mask;
298
299         if (IS_ERR_OR_NULL(engine))
300                 return -EINVAL;
301
302         if (!alloc_cpumask_var(&serial_mask, GFP_KERNEL))
303                 return -ENOMEM;
304
305         if (!alloc_cpumask_var(&parallel_mask, GFP_KERNEL)) {
306                 free_cpumask_var(serial_mask);
307                 return -ENOMEM;
308         }
309
310         cpumask_copy(parallel_mask, cpumask);
311         cpumask_copy(serial_mask, cpu_online_mask);
312
313         rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_PARALLEL,
314                                 parallel_mask);
315         free_cpumask_var(parallel_mask);
316         if (rc)
317                 goto out_failed_mask;
318
319         rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_SERIAL,
320                                 serial_mask);
321 out_failed_mask:
322         free_cpumask_var(serial_mask);
323 #endif /* CONFIG_PADATA */
324
325         return rc;
326 }
327 EXPORT_SYMBOL(cfs_ptengine_set_cpumask);
328
329 /**
330  * This function returns the count of allowed CPUs for parallel execution
331  * for engine with specified id.
332  */
333 int cfs_ptengine_weight(struct cfs_ptask_engine *engine)
334 {
335         if (IS_ERR_OR_NULL(engine))
336                 return -EINVAL;
337
338         return engine->pte_weight;
339 }
340 EXPORT_SYMBOL(cfs_ptengine_weight);
341
342 #ifdef CONFIG_PADATA
343 static int cfs_ptask_cpumask_change_notify(struct notifier_block *self,
344                                            unsigned long val, void *data)
345 {
346         struct padata_cpumask *padata_cpumask = data;
347         struct cfs_ptask_engine *engine;
348
349         engine = container_of(self, struct cfs_ptask_engine, pte_notifier);
350
351         if (val & PADATA_CPU_PARALLEL)
352                 engine->pte_weight = cpumask_weight(padata_cpumask->pcpu);
353
354         return 0;
355 }
356
357 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
358                                     const char *name,
359                                     const struct cpumask *cpumask)
360 {
361         cpumask_var_t all_mask;
362         cpumask_var_t par_mask;
363         unsigned int wq_flags = WQ_MEM_RECLAIM | WQ_CPU_INTENSIVE;
364         int rc;
365
366         get_online_cpus();
367
368         engine->pte_wq = alloc_workqueue(name, wq_flags, 1);
369         if (engine->pte_wq == NULL)
370                 GOTO(err, rc = -ENOMEM);
371
372         if (!alloc_cpumask_var(&all_mask, GFP_KERNEL))
373                 GOTO(err_destroy_workqueue, rc = -ENOMEM);
374
375         if (!alloc_cpumask_var(&par_mask, GFP_KERNEL))
376                 GOTO(err_free_all_mask, rc = -ENOMEM);
377
378         cpumask_copy(par_mask, cpumask);
379         if (cpumask_empty(par_mask) ||
380             cpumask_equal(par_mask, cpu_online_mask)) {
381                 cpumask_copy(all_mask, cpu_online_mask);
382                 cpumask_clear(par_mask);
383                 while (!cpumask_empty(all_mask)) {
384                         int cpu = cpumask_first(all_mask);
385
386                         cpumask_set_cpu(cpu, par_mask);
387                         cpumask_andnot(all_mask, all_mask,
388                                         topology_sibling_cpumask(cpu));
389                 }
390         }
391
392         cpumask_copy(all_mask, cpu_online_mask);
393
394         {
395                 char *pa_mask_buff, *cb_mask_buff;
396
397                 pa_mask_buff = (char *)__get_free_page(GFP_KERNEL);
398                 if (pa_mask_buff == NULL)
399                         GOTO(err_free_par_mask, rc = -ENOMEM);
400
401                 cb_mask_buff = (char *)__get_free_page(GFP_KERNEL);
402                 if (cb_mask_buff == NULL) {
403                         free_page((unsigned long)pa_mask_buff);
404                         GOTO(err_free_par_mask, rc = -ENOMEM);
405                 }
406
407                 cpumap_print_to_pagebuf(true, pa_mask_buff, par_mask);
408                 pa_mask_buff[PAGE_SIZE - 1] = '\0';
409                 cpumap_print_to_pagebuf(true, cb_mask_buff, all_mask);
410                 cb_mask_buff[PAGE_SIZE - 1] = '\0';
411
412                 CDEBUG(D_INFO, "%s weight=%u plist='%s' cblist='%s'\n",
413                         name, cpumask_weight(par_mask),
414                         pa_mask_buff, cb_mask_buff);
415
416                 free_page((unsigned long)cb_mask_buff);
417                 free_page((unsigned long)pa_mask_buff);
418         }
419
420         engine->pte_weight = cpumask_weight(par_mask);
421         engine->pte_pinst  = padata_alloc_possible(engine->pte_wq);
422         if (engine->pte_pinst == NULL)
423                 GOTO(err_free_par_mask, rc = -ENOMEM);
424
425         engine->pte_notifier.notifier_call = cfs_ptask_cpumask_change_notify;
426         rc = padata_register_cpumask_notifier(engine->pte_pinst,
427                                               &engine->pte_notifier);
428         if (rc)
429                 GOTO(err_free_padata, rc);
430
431         rc = cfs_ptengine_set_cpumask(engine, par_mask);
432         if (rc)
433                 GOTO(err_unregister, rc);
434
435         rc = padata_start(engine->pte_pinst);
436         if (rc)
437                 GOTO(err_unregister, rc);
438
439         free_cpumask_var(par_mask);
440         free_cpumask_var(all_mask);
441
442         put_online_cpus();
443         return 0;
444
445 err_unregister:
446         padata_unregister_cpumask_notifier(engine->pte_pinst,
447                                            &engine->pte_notifier);
448 err_free_padata:
449         padata_free(engine->pte_pinst);
450 err_free_par_mask:
451         free_cpumask_var(par_mask);
452 err_free_all_mask:
453         free_cpumask_var(all_mask);
454 err_destroy_workqueue:
455         destroy_workqueue(engine->pte_wq);
456 err:
457         put_online_cpus();
458         return rc;
459 }
460
461 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
462 {
463         padata_stop(engine->pte_pinst);
464         padata_unregister_cpumask_notifier(engine->pte_pinst,
465                                            &engine->pte_notifier);
466         padata_free(engine->pte_pinst);
467         destroy_workqueue(engine->pte_wq);
468 }
469
470 #else  /* !CONFIG_PADATA */
471
472 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
473                                     const char *name,
474                                     const struct cpumask *cpumask)
475 {
476         engine->pte_weight = 1;
477
478         return 0;
479 }
480
481 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
482 {
483 }
484 #endif /* CONFIG_PADATA */
485
486 struct cfs_ptask_engine *cfs_ptengine_init(const char *name,
487                                            const struct cpumask *cpumask)
488 {
489         struct cfs_ptask_engine *engine;
490         int rc;
491
492         engine = kzalloc(sizeof(*engine), GFP_KERNEL);
493         if (engine == NULL)
494                 GOTO(err, rc = -ENOMEM);
495
496         rc = cfs_ptengine_padata_init(engine, name, cpumask);
497         if (rc)
498                 GOTO(err_free_engine, rc);
499
500         return engine;
501
502 err_free_engine:
503         kfree(engine);
504 err:
505         return ERR_PTR(rc);
506 }
507 EXPORT_SYMBOL(cfs_ptengine_init);
508
509 void cfs_ptengine_fini(struct cfs_ptask_engine *engine)
510 {
511         if (IS_ERR_OR_NULL(engine))
512                 return;
513
514         cfs_ptengine_padata_fini(engine);
515         kfree(engine);
516 }
517 EXPORT_SYMBOL(cfs_ptengine_fini);