Whamcloud - gitweb
LU-10308 misc: update Intel copyright messages for 2017
[fs/lustre-release.git] / libcfs / libcfs / libcfs_ptask.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  * Use is subject to license terms.
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  *
29  * parallel task interface
30  */
31 #include <linux/errno.h>
32 #include <linux/kernel.h>
33 #include <linux/module.h>
34 #include <linux/cpumask.h>
35 #include <linux/cpu.h>
36 #include <linux/slab.h>
37 #include <linux/sched.h>
38 #include <linux/moduleparam.h>
39 #include <linux/mmu_context.h>
40
41 #define DEBUG_SUBSYSTEM S_UNDEFINED
42
43 #include <libcfs/libcfs.h>
44 #include <libcfs/libcfs_ptask.h>
45
46 /**
47  * This API based on Linux kernel padada API which is used to perform
48  * encryption and decryption on large numbers of packets without
49  * reordering those packets.
50  *
51  * It was adopted for general use in Lustre for parallelization of
52  * various functionality.
53  *
54  * The first step in using it is to set up a cfs_ptask structure to
55  * control of how this task are to be run:
56  *
57  * #include <libcfs/libcfs_ptask.h>
58  *
59  * int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
60  *                    void *cbdata, unsigned int flags, int cpu);
61  *
62  * The cbfunc function with cbdata argument will be called in the process
63  * of getting the task done. The cpu specifies which CPU will be used for
64  * the final callback when the task is done.
65  *
66  * The submission of task is done with:
67  *
68  * int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine);
69  *
70  * The task is submitted to the engine for execution.
71  *
72  * In order to wait for result of task execution you should call:
73  *
74  * int cfs_ptask_wait_for(struct cfs_ptask *ptask);
75  *
76  * The tasks with flag PTF_ORDERED are executed in parallel but complete
77  * into submission order. So, waiting for last ordered task you can be sure
78  * that all previous tasks were done before this task complete.
79  */
80
81 #ifndef HAVE_REINIT_COMPLETION
82 /**
83  * reinit_completion - reinitialize a completion structure
84  * @x:  pointer to completion structure that is to be reinitialized
85  *
86  * This inline function should be used to reinitialize a completion
87  * structure so it can be reused. This is especially important after
88  * complete_all() is used.
89  */
90 static inline void reinit_completion(struct completion *x)
91 {
92         x->done = 0;
93 }
94 #endif
95
96 #ifndef HAVE_CPUMASK_PRINT_TO_PAGEBUF
97 static inline void cpumap_print_to_pagebuf(bool unused, char *buf,
98                                            const struct cpumask *mask)
99 {
100         cpulist_scnprintf(buf, PAGE_SIZE, mask);
101 }
102 #endif
103
104 #ifdef CONFIG_PADATA
105 static void cfs_ptask_complete(struct padata_priv *padata)
106 {
107         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
108
109         if (cfs_ptask_need_complete(ptask)) {
110                 if (cfs_ptask_is_ordered(ptask))
111                         complete(&ptask->pt_completion);
112         } else if (cfs_ptask_is_autofree(ptask)) {
113                 kfree(ptask);
114         }
115 }
116
117 static void cfs_ptask_execute(struct padata_priv *padata)
118 {
119         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
120         mm_segment_t old_fs = get_fs();
121         bool bh_enabled = false;
122
123         if (!cfs_ptask_is_atomic(ptask)) {
124                 local_bh_enable();
125                 bh_enabled = true;
126         }
127
128         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
129                 use_mm(ptask->pt_mm);
130                 set_fs(ptask->pt_fs);
131         }
132
133         if (ptask->pt_cbfunc != NULL)
134                 ptask->pt_result = ptask->pt_cbfunc(ptask);
135         else
136                 ptask->pt_result = -ENOSYS;
137
138         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
139                 set_fs(old_fs);
140                 unuse_mm(ptask->pt_mm);
141                 mmput(ptask->pt_mm);
142                 ptask->pt_mm = NULL;
143         }
144
145         if (cfs_ptask_need_complete(ptask) && !cfs_ptask_is_ordered(ptask))
146                 complete(&ptask->pt_completion);
147
148         if (bh_enabled)
149                 local_bh_disable();
150
151         padata_do_serial(padata);
152 }
153
154 static int cfs_do_parallel(struct cfs_ptask_engine *engine,
155                            struct padata_priv *padata)
156 {
157         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
158         int rc;
159
160         if (cfs_ptask_need_complete(ptask))
161                 reinit_completion(&ptask->pt_completion);
162
163         if (cfs_ptask_use_user_mm(ptask)) {
164                 ptask->pt_mm = get_task_mm(current);
165                 ptask->pt_fs = get_fs();
166         }
167         ptask->pt_result = -EINPROGRESS;
168
169 retry:
170         rc = padata_do_parallel(engine->pte_pinst, padata, ptask->pt_cbcpu);
171         if (rc == -EBUSY && cfs_ptask_is_retry(ptask)) {
172                 /* too many tasks already in queue */
173                 schedule_timeout_uninterruptible(1);
174                 goto retry;
175         }
176
177         if (rc) {
178                 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
179                         mmput(ptask->pt_mm);
180                         ptask->pt_mm = NULL;
181                 }
182                 ptask->pt_result = rc;
183         }
184
185         return rc;
186 }
187
188 /**
189  * This function submit initialized task for async execution
190  * in engine with specified id.
191  */
192 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
193 {
194         struct padata_priv *padata = cfs_ptask2padata(ptask);
195
196         if (IS_ERR_OR_NULL(engine))
197                 return -EINVAL;
198
199         memset(padata, 0, sizeof(*padata));
200
201         padata->parallel = cfs_ptask_execute;
202         padata->serial   = cfs_ptask_complete;
203
204         return cfs_do_parallel(engine, padata);
205 }
206
207 #else  /* !CONFIG_PADATA */
208
209 /**
210  * If CONFIG_PADATA is not defined this function just execute
211  * the initialized task in current thread. (emulate async execution)
212  */
213 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
214 {
215         if (IS_ERR_OR_NULL(engine))
216                 return -EINVAL;
217
218         if (ptask->pt_cbfunc != NULL)
219                 ptask->pt_result = ptask->pt_cbfunc(ptask);
220         else
221                 ptask->pt_result = -ENOSYS;
222
223         if (cfs_ptask_need_complete(ptask))
224                 complete(&ptask->pt_completion);
225         else if (cfs_ptask_is_autofree(ptask))
226                 kfree(ptask);
227
228         return 0;
229 }
230 #endif /* CONFIG_PADATA */
231
232 EXPORT_SYMBOL(cfs_ptask_submit);
233
234 /**
235  * This function waits when task complete async execution.
236  * The tasks with flag PTF_ORDERED are executed in parallel but completes
237  * into submission order. So, waiting for last ordered task you can be sure
238  * that all previous tasks were done before this task complete.
239  */
240 int cfs_ptask_wait_for(struct cfs_ptask *ptask)
241 {
242         if (!cfs_ptask_need_complete(ptask))
243                 return -EINVAL;
244
245         wait_for_completion(&ptask->pt_completion);
246
247         return 0;
248 }
249 EXPORT_SYMBOL(cfs_ptask_wait_for);
250
251 /**
252  * This function initialize internal members of task and prepare it for
253  * async execution.
254  */
255 int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc, void *cbdata,
256                    unsigned int flags, int cpu)
257 {
258         memset(ptask, 0, sizeof(*ptask));
259
260         ptask->pt_flags  = flags;
261         ptask->pt_cbcpu  = cpu;
262         ptask->pt_mm     = NULL; /* will be set in cfs_do_parallel() */
263         ptask->pt_fs     = get_fs();
264         ptask->pt_cbfunc = cbfunc;
265         ptask->pt_cbdata = cbdata;
266         ptask->pt_result = -EAGAIN;
267
268         if (cfs_ptask_need_complete(ptask)) {
269                 if (cfs_ptask_is_autofree(ptask))
270                         return -EINVAL;
271
272                 init_completion(&ptask->pt_completion);
273         }
274
275         if (cfs_ptask_is_atomic(ptask) && cfs_ptask_use_user_mm(ptask))
276                 return -EINVAL;
277
278         return 0;
279 }
280 EXPORT_SYMBOL(cfs_ptask_init);
281
282 /**
283  * This function set the mask of allowed CPUs for parallel execution
284  * for engine with specified id.
285  */
286 int cfs_ptengine_set_cpumask(struct cfs_ptask_engine *engine,
287                              const struct cpumask *cpumask)
288 {
289         int rc = 0;
290
291 #ifdef CONFIG_PADATA
292         cpumask_var_t serial_mask;
293         cpumask_var_t parallel_mask;
294
295         if (IS_ERR_OR_NULL(engine))
296                 return -EINVAL;
297
298         if (!alloc_cpumask_var(&serial_mask, GFP_KERNEL))
299                 return -ENOMEM;
300
301         if (!alloc_cpumask_var(&parallel_mask, GFP_KERNEL)) {
302                 free_cpumask_var(serial_mask);
303                 return -ENOMEM;
304         }
305
306         cpumask_copy(parallel_mask, cpumask);
307         cpumask_copy(serial_mask, cpu_online_mask);
308
309         rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_PARALLEL,
310                                 parallel_mask);
311         free_cpumask_var(parallel_mask);
312         if (rc)
313                 goto out_failed_mask;
314
315         rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_SERIAL,
316                                 serial_mask);
317 out_failed_mask:
318         free_cpumask_var(serial_mask);
319 #endif /* CONFIG_PADATA */
320
321         return rc;
322 }
323 EXPORT_SYMBOL(cfs_ptengine_set_cpumask);
324
325 /**
326  * This function returns the count of allowed CPUs for parallel execution
327  * for engine with specified id.
328  */
329 int cfs_ptengine_weight(struct cfs_ptask_engine *engine)
330 {
331         if (IS_ERR_OR_NULL(engine))
332                 return -EINVAL;
333
334         return engine->pte_weight;
335 }
336 EXPORT_SYMBOL(cfs_ptengine_weight);
337
338 #ifdef CONFIG_PADATA
339 static int cfs_ptask_cpumask_change_notify(struct notifier_block *self,
340                                            unsigned long val, void *data)
341 {
342         struct padata_cpumask *padata_cpumask = data;
343         struct cfs_ptask_engine *engine;
344
345         engine = container_of(self, struct cfs_ptask_engine, pte_notifier);
346
347         if (val & PADATA_CPU_PARALLEL)
348                 engine->pte_weight = cpumask_weight(padata_cpumask->pcpu);
349
350         return 0;
351 }
352
353 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
354                                     const char *name,
355                                     const struct cpumask *cpumask)
356 {
357         cpumask_var_t all_mask;
358         cpumask_var_t par_mask;
359         unsigned int wq_flags = WQ_MEM_RECLAIM | WQ_CPU_INTENSIVE;
360         int rc;
361
362         get_online_cpus();
363
364         engine->pte_wq = alloc_workqueue(name, wq_flags, 1);
365         if (engine->pte_wq == NULL)
366                 GOTO(err, rc = -ENOMEM);
367
368         if (!alloc_cpumask_var(&all_mask, GFP_KERNEL))
369                 GOTO(err_destroy_workqueue, rc = -ENOMEM);
370
371         if (!alloc_cpumask_var(&par_mask, GFP_KERNEL))
372                 GOTO(err_free_all_mask, rc = -ENOMEM);
373
374         cpumask_copy(par_mask, cpumask);
375         if (cpumask_empty(par_mask) ||
376             cpumask_equal(par_mask, cpu_online_mask)) {
377                 cpumask_copy(all_mask, cpu_online_mask);
378                 cpumask_clear(par_mask);
379                 while (!cpumask_empty(all_mask)) {
380                         int cpu = cpumask_first(all_mask);
381
382                         cpumask_set_cpu(cpu, par_mask);
383                         cpumask_andnot(all_mask, all_mask,
384                                         topology_sibling_cpumask(cpu));
385                 }
386         }
387
388         cpumask_copy(all_mask, cpu_online_mask);
389
390         {
391                 char *pa_mask_buff, *cb_mask_buff;
392
393                 pa_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
394                 if (pa_mask_buff == NULL)
395                         GOTO(err_free_par_mask, rc = -ENOMEM);
396
397                 cb_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
398                 if (cb_mask_buff == NULL) {
399                         free_page((unsigned long)pa_mask_buff);
400                         GOTO(err_free_par_mask, rc = -ENOMEM);
401                 }
402
403                 cpumap_print_to_pagebuf(true, pa_mask_buff, par_mask);
404                 pa_mask_buff[PAGE_SIZE - 1] = '\0';
405                 cpumap_print_to_pagebuf(true, cb_mask_buff, all_mask);
406                 cb_mask_buff[PAGE_SIZE - 1] = '\0';
407
408                 CDEBUG(D_INFO, "%s weight=%u plist='%s' cblist='%s'\n",
409                         name, cpumask_weight(par_mask),
410                         pa_mask_buff, cb_mask_buff);
411
412                 free_page((unsigned long)cb_mask_buff);
413                 free_page((unsigned long)pa_mask_buff);
414         }
415
416         engine->pte_weight = cpumask_weight(par_mask);
417         engine->pte_pinst  = padata_alloc_possible(engine->pte_wq);
418         if (engine->pte_pinst == NULL)
419                 GOTO(err_free_par_mask, rc = -ENOMEM);
420
421         engine->pte_notifier.notifier_call = cfs_ptask_cpumask_change_notify;
422         rc = padata_register_cpumask_notifier(engine->pte_pinst,
423                                               &engine->pte_notifier);
424         if (rc)
425                 GOTO(err_free_padata, rc);
426
427         rc = cfs_ptengine_set_cpumask(engine, par_mask);
428         if (rc)
429                 GOTO(err_unregister, rc);
430
431         rc = padata_start(engine->pte_pinst);
432         if (rc)
433                 GOTO(err_unregister, rc);
434
435         free_cpumask_var(par_mask);
436         free_cpumask_var(all_mask);
437
438         put_online_cpus();
439         return 0;
440
441 err_unregister:
442         padata_unregister_cpumask_notifier(engine->pte_pinst,
443                                            &engine->pte_notifier);
444 err_free_padata:
445         padata_free(engine->pte_pinst);
446 err_free_par_mask:
447         free_cpumask_var(par_mask);
448 err_free_all_mask:
449         free_cpumask_var(all_mask);
450 err_destroy_workqueue:
451         destroy_workqueue(engine->pte_wq);
452 err:
453         put_online_cpus();
454         return rc;
455 }
456
457 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
458 {
459         padata_stop(engine->pte_pinst);
460         padata_unregister_cpumask_notifier(engine->pte_pinst,
461                                            &engine->pte_notifier);
462         padata_free(engine->pte_pinst);
463         destroy_workqueue(engine->pte_wq);
464 }
465
466 #else  /* !CONFIG_PADATA */
467
468 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
469                                     const char *name,
470                                     const struct cpumask *cpumask)
471 {
472         engine->pte_weight = 1;
473
474         return 0;
475 }
476
477 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
478 {
479 }
480 #endif /* CONFIG_PADATA */
481
482 struct cfs_ptask_engine *cfs_ptengine_init(const char *name,
483                                            const struct cpumask *cpumask)
484 {
485         struct cfs_ptask_engine *engine;
486         int rc;
487
488         engine = kzalloc(sizeof(*engine), GFP_KERNEL);
489         if (engine == NULL)
490                 GOTO(err, rc = -ENOMEM);
491
492         rc = cfs_ptengine_padata_init(engine, name, cpumask);
493         if (rc)
494                 GOTO(err_free_engine, rc);
495
496         return engine;
497
498 err_free_engine:
499         kfree(engine);
500 err:
501         return ERR_PTR(rc);
502 }
503 EXPORT_SYMBOL(cfs_ptengine_init);
504
505 void cfs_ptengine_fini(struct cfs_ptask_engine *engine)
506 {
507         if (IS_ERR_OR_NULL(engine))
508                 return;
509
510         cfs_ptengine_padata_fini(engine);
511         kfree(engine);
512 }
513 EXPORT_SYMBOL(cfs_ptengine_fini);