4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, Intel Corporation.
24 * Use is subject to license terms.
27 * This file is part of Lustre, http://www.lustre.org/
29 * parallel task interface
31 #include <linux/errno.h>
32 #include <linux/kernel.h>
33 #include <linux/module.h>
34 #include <linux/cpumask.h>
35 #include <linux/cpu.h>
36 #include <linux/slab.h>
37 #include <linux/sched.h>
38 #ifdef HAVE_SCHED_HEADERS
39 #include <linux/sched/signal.h>
40 #include <linux/sched/mm.h>
42 #include <linux/moduleparam.h>
43 #include <linux/mmu_context.h>
45 #define DEBUG_SUBSYSTEM S_UNDEFINED
47 #include <libcfs/libcfs.h>
48 #include <libcfs/libcfs_ptask.h>
51 * This API based on Linux kernel padada API which is used to perform
52 * encryption and decryption on large numbers of packets without
53 * reordering those packets.
55 * It was adopted for general use in Lustre for parallelization of
56 * various functionality.
58 * The first step in using it is to set up a cfs_ptask structure to
59 * control of how this task are to be run:
61 * #include <libcfs/libcfs_ptask.h>
63 * int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
64 * void *cbdata, unsigned int flags, int cpu);
66 * The cbfunc function with cbdata argument will be called in the process
67 * of getting the task done. The cpu specifies which CPU will be used for
68 * the final callback when the task is done.
70 * The submission of task is done with:
72 * int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine);
74 * The task is submitted to the engine for execution.
76 * In order to wait for result of task execution you should call:
78 * int cfs_ptask_wait_for(struct cfs_ptask *ptask);
80 * The tasks with flag PTF_ORDERED are executed in parallel but complete
81 * into submission order. So, waiting for last ordered task you can be sure
82 * that all previous tasks were done before this task complete.
85 #ifndef HAVE_REINIT_COMPLETION
87 * reinit_completion - reinitialize a completion structure
88 * @x: pointer to completion structure that is to be reinitialized
90 * This inline function should be used to reinitialize a completion
91 * structure so it can be reused. This is especially important after
92 * complete_all() is used.
94 static inline void reinit_completion(struct completion *x)
100 #ifndef HAVE_CPUMASK_PRINT_TO_PAGEBUF
101 static inline void cpumap_print_to_pagebuf(bool unused, char *buf,
102 const struct cpumask *mask)
104 cpulist_scnprintf(buf, PAGE_SIZE, mask);
109 static void cfs_ptask_complete(struct padata_priv *padata)
111 struct cfs_ptask *ptask = cfs_padata2ptask(padata);
113 if (cfs_ptask_need_complete(ptask)) {
114 if (cfs_ptask_is_ordered(ptask))
115 complete(&ptask->pt_completion);
116 } else if (cfs_ptask_is_autofree(ptask)) {
121 static void cfs_ptask_execute(struct padata_priv *padata)
123 struct cfs_ptask *ptask = cfs_padata2ptask(padata);
124 mm_segment_t old_fs = get_fs();
125 bool bh_enabled = false;
127 if (!cfs_ptask_is_atomic(ptask)) {
132 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
133 use_mm(ptask->pt_mm);
134 set_fs(ptask->pt_fs);
137 if (ptask->pt_cbfunc != NULL)
138 ptask->pt_result = ptask->pt_cbfunc(ptask);
140 ptask->pt_result = -ENOSYS;
142 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
144 unuse_mm(ptask->pt_mm);
149 if (cfs_ptask_need_complete(ptask) && !cfs_ptask_is_ordered(ptask))
150 complete(&ptask->pt_completion);
155 padata_do_serial(padata);
158 static int cfs_do_parallel(struct cfs_ptask_engine *engine,
159 struct padata_priv *padata)
161 struct cfs_ptask *ptask = cfs_padata2ptask(padata);
164 if (cfs_ptask_need_complete(ptask))
165 reinit_completion(&ptask->pt_completion);
167 if (cfs_ptask_use_user_mm(ptask)) {
168 ptask->pt_mm = get_task_mm(current);
169 ptask->pt_fs = get_fs();
171 ptask->pt_result = -EINPROGRESS;
174 rc = padata_do_parallel(engine->pte_pinst, padata, ptask->pt_cbcpu);
175 if (rc == -EBUSY && cfs_ptask_is_retry(ptask)) {
176 /* too many tasks already in queue */
177 schedule_timeout_uninterruptible(1);
182 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
186 ptask->pt_result = rc;
193 * This function submit initialized task for async execution
194 * in engine with specified id.
196 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
198 struct padata_priv *padata = cfs_ptask2padata(ptask);
200 if (IS_ERR_OR_NULL(engine))
203 memset(padata, 0, sizeof(*padata));
205 padata->parallel = cfs_ptask_execute;
206 padata->serial = cfs_ptask_complete;
208 return cfs_do_parallel(engine, padata);
211 #else /* !CONFIG_PADATA */
214 * If CONFIG_PADATA is not defined this function just execute
215 * the initialized task in current thread. (emulate async execution)
217 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
219 if (IS_ERR_OR_NULL(engine))
222 if (ptask->pt_cbfunc != NULL)
223 ptask->pt_result = ptask->pt_cbfunc(ptask);
225 ptask->pt_result = -ENOSYS;
227 if (cfs_ptask_need_complete(ptask))
228 complete(&ptask->pt_completion);
229 else if (cfs_ptask_is_autofree(ptask))
234 #endif /* CONFIG_PADATA */
236 EXPORT_SYMBOL(cfs_ptask_submit);
239 * This function waits when task complete async execution.
240 * The tasks with flag PTF_ORDERED are executed in parallel but completes
241 * into submission order. So, waiting for last ordered task you can be sure
242 * that all previous tasks were done before this task complete.
244 int cfs_ptask_wait_for(struct cfs_ptask *ptask)
246 if (!cfs_ptask_need_complete(ptask))
249 wait_for_completion(&ptask->pt_completion);
253 EXPORT_SYMBOL(cfs_ptask_wait_for);
256 * This function initialize internal members of task and prepare it for
259 int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc, void *cbdata,
260 unsigned int flags, int cpu)
262 memset(ptask, 0, sizeof(*ptask));
264 ptask->pt_flags = flags;
265 ptask->pt_cbcpu = cpu;
266 ptask->pt_mm = NULL; /* will be set in cfs_do_parallel() */
267 ptask->pt_fs = get_fs();
268 ptask->pt_cbfunc = cbfunc;
269 ptask->pt_cbdata = cbdata;
270 ptask->pt_result = -EAGAIN;
272 if (cfs_ptask_need_complete(ptask)) {
273 if (cfs_ptask_is_autofree(ptask))
276 init_completion(&ptask->pt_completion);
279 if (cfs_ptask_is_atomic(ptask) && cfs_ptask_use_user_mm(ptask))
284 EXPORT_SYMBOL(cfs_ptask_init);
287 * This function set the mask of allowed CPUs for parallel execution
288 * for engine with specified id.
290 int cfs_ptengine_set_cpumask(struct cfs_ptask_engine *engine,
291 const struct cpumask *cpumask)
296 cpumask_var_t serial_mask;
297 cpumask_var_t parallel_mask;
299 if (IS_ERR_OR_NULL(engine))
302 if (!alloc_cpumask_var(&serial_mask, GFP_KERNEL))
305 if (!alloc_cpumask_var(¶llel_mask, GFP_KERNEL)) {
306 free_cpumask_var(serial_mask);
310 cpumask_copy(parallel_mask, cpumask);
311 cpumask_copy(serial_mask, cpu_online_mask);
313 rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_PARALLEL,
315 free_cpumask_var(parallel_mask);
317 goto out_failed_mask;
319 rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_SERIAL,
322 free_cpumask_var(serial_mask);
323 #endif /* CONFIG_PADATA */
327 EXPORT_SYMBOL(cfs_ptengine_set_cpumask);
330 * This function returns the count of allowed CPUs for parallel execution
331 * for engine with specified id.
333 int cfs_ptengine_weight(struct cfs_ptask_engine *engine)
335 if (IS_ERR_OR_NULL(engine))
338 return engine->pte_weight;
340 EXPORT_SYMBOL(cfs_ptengine_weight);
343 static int cfs_ptask_cpumask_change_notify(struct notifier_block *self,
344 unsigned long val, void *data)
346 struct padata_cpumask *padata_cpumask = data;
347 struct cfs_ptask_engine *engine;
349 engine = container_of(self, struct cfs_ptask_engine, pte_notifier);
351 if (val & PADATA_CPU_PARALLEL)
352 engine->pte_weight = cpumask_weight(padata_cpumask->pcpu);
357 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
359 const struct cpumask *cpumask)
361 cpumask_var_t all_mask;
362 cpumask_var_t par_mask;
363 unsigned int wq_flags = WQ_MEM_RECLAIM | WQ_CPU_INTENSIVE;
368 engine->pte_wq = alloc_workqueue(name, wq_flags, 1);
369 if (engine->pte_wq == NULL)
370 GOTO(err, rc = -ENOMEM);
372 if (!alloc_cpumask_var(&all_mask, GFP_KERNEL))
373 GOTO(err_destroy_workqueue, rc = -ENOMEM);
375 if (!alloc_cpumask_var(&par_mask, GFP_KERNEL))
376 GOTO(err_free_all_mask, rc = -ENOMEM);
378 cpumask_copy(par_mask, cpumask);
379 if (cpumask_empty(par_mask) ||
380 cpumask_equal(par_mask, cpu_online_mask)) {
381 cpumask_copy(all_mask, cpu_online_mask);
382 cpumask_clear(par_mask);
383 while (!cpumask_empty(all_mask)) {
384 int cpu = cpumask_first(all_mask);
386 cpumask_set_cpu(cpu, par_mask);
387 cpumask_andnot(all_mask, all_mask,
388 topology_sibling_cpumask(cpu));
392 cpumask_copy(all_mask, cpu_online_mask);
395 char *pa_mask_buff, *cb_mask_buff;
397 pa_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
398 if (pa_mask_buff == NULL)
399 GOTO(err_free_par_mask, rc = -ENOMEM);
401 cb_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
402 if (cb_mask_buff == NULL) {
403 free_page((unsigned long)pa_mask_buff);
404 GOTO(err_free_par_mask, rc = -ENOMEM);
407 cpumap_print_to_pagebuf(true, pa_mask_buff, par_mask);
408 pa_mask_buff[PAGE_SIZE - 1] = '\0';
409 cpumap_print_to_pagebuf(true, cb_mask_buff, all_mask);
410 cb_mask_buff[PAGE_SIZE - 1] = '\0';
412 CDEBUG(D_INFO, "%s weight=%u plist='%s' cblist='%s'\n",
413 name, cpumask_weight(par_mask),
414 pa_mask_buff, cb_mask_buff);
416 free_page((unsigned long)cb_mask_buff);
417 free_page((unsigned long)pa_mask_buff);
420 engine->pte_weight = cpumask_weight(par_mask);
421 engine->pte_pinst = padata_alloc_possible(engine->pte_wq);
422 if (engine->pte_pinst == NULL)
423 GOTO(err_free_par_mask, rc = -ENOMEM);
425 engine->pte_notifier.notifier_call = cfs_ptask_cpumask_change_notify;
426 rc = padata_register_cpumask_notifier(engine->pte_pinst,
427 &engine->pte_notifier);
429 GOTO(err_free_padata, rc);
431 rc = cfs_ptengine_set_cpumask(engine, par_mask);
433 GOTO(err_unregister, rc);
435 rc = padata_start(engine->pte_pinst);
437 GOTO(err_unregister, rc);
439 free_cpumask_var(par_mask);
440 free_cpumask_var(all_mask);
446 padata_unregister_cpumask_notifier(engine->pte_pinst,
447 &engine->pte_notifier);
449 padata_free(engine->pte_pinst);
451 free_cpumask_var(par_mask);
453 free_cpumask_var(all_mask);
454 err_destroy_workqueue:
455 destroy_workqueue(engine->pte_wq);
461 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
463 padata_stop(engine->pte_pinst);
464 padata_unregister_cpumask_notifier(engine->pte_pinst,
465 &engine->pte_notifier);
466 padata_free(engine->pte_pinst);
467 destroy_workqueue(engine->pte_wq);
470 #else /* !CONFIG_PADATA */
472 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
474 const struct cpumask *cpumask)
476 engine->pte_weight = 1;
481 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
484 #endif /* CONFIG_PADATA */
486 struct cfs_ptask_engine *cfs_ptengine_init(const char *name,
487 const struct cpumask *cpumask)
489 struct cfs_ptask_engine *engine;
492 engine = kzalloc(sizeof(*engine), GFP_KERNEL);
494 GOTO(err, rc = -ENOMEM);
496 rc = cfs_ptengine_padata_init(engine, name, cpumask);
498 GOTO(err_free_engine, rc);
507 EXPORT_SYMBOL(cfs_ptengine_init);
509 void cfs_ptengine_fini(struct cfs_ptask_engine *engine)
511 if (IS_ERR_OR_NULL(engine))
514 cfs_ptengine_padata_fini(engine);
517 EXPORT_SYMBOL(cfs_ptengine_fini);