1 #include <linux/errno.h>
2 #include <linux/kernel.h>
3 #include <linux/module.h>
4 #include <linux/cpumask.h>
6 #include <linux/slab.h>
7 #include <linux/sched.h>
8 #include <linux/moduleparam.h>
9 #include <linux/mmu_context.h>
11 #define DEBUG_SUBSYSTEM S_UNDEFINED
13 #include <libcfs/libcfs.h>
14 #include <libcfs/libcfs_ptask.h>
17 * This API based on Linux kernel padada API which is used to perform
18 * encryption and decryption on large numbers of packets without
19 * reordering those packets.
21 * It was adopted for general use in Lustre for parallelization of
22 * various functionality.
24 * The first step in using it is to set up a cfs_ptask structure to
25 * control of how this task are to be run:
27 * #include <libcfs/libcfs_ptask.h>
29 * int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
30 * void *cbdata, unsigned int flags, int cpu);
32 * The cbfunc function with cbdata argument will be called in the process
33 * of getting the task done. The cpu specifies which CPU will be used for
34 * the final callback when the task is done.
36 * The submission of task is done with:
38 * int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine);
40 * The task is submitted to the engine for execution.
42 * In order to wait for result of task execution you should call:
44 * int cfs_ptask_wait_for(struct cfs_ptask *ptask);
46 * The tasks with flag PTF_ORDERED are executed in parallel but complete
47 * into submission order. So, waiting for last ordered task you can be sure
48 * that all previous tasks were done before this task complete.
51 #ifndef HAVE_REINIT_COMPLETION
53 * reinit_completion - reinitialize a completion structure
54 * @x: pointer to completion structure that is to be reinitialized
56 * This inline function should be used to reinitialize a completion
57 * structure so it can be reused. This is especially important after
58 * complete_all() is used.
60 static inline void reinit_completion(struct completion *x)
66 #ifndef HAVE_CPUMASK_PRINT_TO_PAGEBUF
67 static inline void cpumap_print_to_pagebuf(bool unused, char *buf,
68 const struct cpumask *mask)
70 cpulist_scnprintf(buf, PAGE_SIZE, mask);
75 static void cfs_ptask_complete(struct padata_priv *padata)
77 struct cfs_ptask *ptask = cfs_padata2ptask(padata);
79 if (cfs_ptask_need_complete(ptask)) {
80 if (cfs_ptask_is_ordered(ptask))
81 complete(&ptask->pt_completion);
82 } else if (cfs_ptask_is_autofree(ptask)) {
87 static void cfs_ptask_execute(struct padata_priv *padata)
89 struct cfs_ptask *ptask = cfs_padata2ptask(padata);
90 mm_segment_t old_fs = get_fs();
91 bool bh_enabled = false;
93 if (!cfs_ptask_is_atomic(ptask)) {
98 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
100 set_fs(ptask->pt_fs);
103 if (ptask->pt_cbfunc != NULL)
104 ptask->pt_result = ptask->pt_cbfunc(ptask);
106 ptask->pt_result = -ENOSYS;
108 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
110 unuse_mm(ptask->pt_mm);
115 if (cfs_ptask_need_complete(ptask) && !cfs_ptask_is_ordered(ptask))
116 complete(&ptask->pt_completion);
121 padata_do_serial(padata);
124 static int cfs_do_parallel(struct cfs_ptask_engine *engine,
125 struct padata_priv *padata)
127 struct cfs_ptask *ptask = cfs_padata2ptask(padata);
130 if (cfs_ptask_need_complete(ptask))
131 reinit_completion(&ptask->pt_completion);
133 if (cfs_ptask_use_user_mm(ptask)) {
134 ptask->pt_mm = get_task_mm(current);
135 ptask->pt_fs = get_fs();
137 ptask->pt_result = -EINPROGRESS;
140 rc = padata_do_parallel(engine->pte_pinst, padata, ptask->pt_cbcpu);
141 if (rc == -EBUSY && cfs_ptask_is_retry(ptask)) {
142 /* too many tasks already in queue */
143 schedule_timeout_uninterruptible(1);
148 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
152 ptask->pt_result = rc;
159 * This function submit initialized task for async execution
160 * in engine with specified id.
162 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
164 struct padata_priv *padata = cfs_ptask2padata(ptask);
166 if (IS_ERR_OR_NULL(engine))
169 memset(padata, 0, sizeof(*padata));
171 padata->parallel = cfs_ptask_execute;
172 padata->serial = cfs_ptask_complete;
174 return cfs_do_parallel(engine, padata);
177 #else /* !CONFIG_PADATA */
180 * If CONFIG_PADATA is not defined this function just execute
181 * the initialized task in current thread. (emulate async execution)
183 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
185 if (IS_ERR_OR_NULL(engine))
188 if (ptask->pt_cbfunc != NULL)
189 ptask->pt_result = ptask->pt_cbfunc(ptask);
191 ptask->pt_result = -ENOSYS;
193 if (cfs_ptask_need_complete(ptask))
194 complete(&ptask->pt_completion);
195 else if (cfs_ptask_is_autofree(ptask))
200 #endif /* CONFIG_PADATA */
202 EXPORT_SYMBOL(cfs_ptask_submit);
205 * This function waits when task complete async execution.
206 * The tasks with flag PTF_ORDERED are executed in parallel but completes
207 * into submission order. So, waiting for last ordered task you can be sure
208 * that all previous tasks were done before this task complete.
210 int cfs_ptask_wait_for(struct cfs_ptask *ptask)
212 if (!cfs_ptask_need_complete(ptask))
215 wait_for_completion(&ptask->pt_completion);
219 EXPORT_SYMBOL(cfs_ptask_wait_for);
222 * This function initialize internal members of task and prepare it for
225 int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc, void *cbdata,
226 unsigned int flags, int cpu)
228 memset(ptask, 0, sizeof(*ptask));
230 ptask->pt_flags = flags;
231 ptask->pt_cbcpu = cpu;
232 ptask->pt_mm = NULL; /* will be set in cfs_do_parallel() */
233 ptask->pt_fs = get_fs();
234 ptask->pt_cbfunc = cbfunc;
235 ptask->pt_cbdata = cbdata;
236 ptask->pt_result = -EAGAIN;
238 if (cfs_ptask_need_complete(ptask)) {
239 if (cfs_ptask_is_autofree(ptask))
242 init_completion(&ptask->pt_completion);
245 if (cfs_ptask_is_atomic(ptask) && cfs_ptask_use_user_mm(ptask))
250 EXPORT_SYMBOL(cfs_ptask_init);
253 * This function set the mask of allowed CPUs for parallel execution
254 * for engine with specified id.
256 int cfs_ptengine_set_cpumask(struct cfs_ptask_engine *engine,
257 const struct cpumask *cpumask)
262 cpumask_var_t serial_mask;
263 cpumask_var_t parallel_mask;
265 if (IS_ERR_OR_NULL(engine))
268 if (!alloc_cpumask_var(&serial_mask, GFP_KERNEL))
271 if (!alloc_cpumask_var(¶llel_mask, GFP_KERNEL)) {
272 free_cpumask_var(serial_mask);
276 cpumask_copy(parallel_mask, cpumask);
277 cpumask_copy(serial_mask, cpu_online_mask);
279 rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_PARALLEL,
281 free_cpumask_var(parallel_mask);
283 goto out_failed_mask;
285 rc = padata_set_cpumask(engine->pte_pinst, PADATA_CPU_SERIAL,
288 free_cpumask_var(serial_mask);
289 #endif /* CONFIG_PADATA */
293 EXPORT_SYMBOL(cfs_ptengine_set_cpumask);
296 * This function returns the count of allowed CPUs for parallel execution
297 * for engine with specified id.
299 int cfs_ptengine_weight(struct cfs_ptask_engine *engine)
301 if (IS_ERR_OR_NULL(engine))
304 return engine->pte_weight;
306 EXPORT_SYMBOL(cfs_ptengine_weight);
309 static int cfs_ptask_cpumask_change_notify(struct notifier_block *self,
310 unsigned long val, void *data)
312 struct padata_cpumask *padata_cpumask = data;
313 struct cfs_ptask_engine *engine;
315 engine = container_of(self, struct cfs_ptask_engine, pte_notifier);
317 if (val & PADATA_CPU_PARALLEL)
318 engine->pte_weight = cpumask_weight(padata_cpumask->pcpu);
323 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
325 const struct cpumask *cpumask)
327 cpumask_var_t all_mask;
328 cpumask_var_t par_mask;
329 unsigned int wq_flags = WQ_MEM_RECLAIM | WQ_CPU_INTENSIVE;
334 engine->pte_wq = alloc_workqueue(name, wq_flags, 1);
335 if (engine->pte_wq == NULL)
336 GOTO(err, rc = -ENOMEM);
338 if (!alloc_cpumask_var(&all_mask, GFP_KERNEL))
339 GOTO(err_destroy_workqueue, rc = -ENOMEM);
341 if (!alloc_cpumask_var(&par_mask, GFP_KERNEL))
342 GOTO(err_free_all_mask, rc = -ENOMEM);
344 cpumask_copy(par_mask, cpumask);
345 if (cpumask_empty(par_mask) ||
346 cpumask_equal(par_mask, cpu_online_mask)) {
347 cpumask_copy(all_mask, cpu_online_mask);
348 cpumask_clear(par_mask);
349 while (!cpumask_empty(all_mask)) {
350 int cpu = cpumask_first(all_mask);
352 cpumask_set_cpu(cpu, par_mask);
353 cpumask_andnot(all_mask, all_mask,
354 topology_sibling_cpumask(cpu));
358 cpumask_copy(all_mask, cpu_online_mask);
361 char *pa_mask_buff, *cb_mask_buff;
363 pa_mask_buff = (char *)__get_free_page(GFP_KERNEL);
364 if (pa_mask_buff == NULL)
365 GOTO(err_free_par_mask, rc = -ENOMEM);
367 cb_mask_buff = (char *)__get_free_page(GFP_KERNEL);
368 if (cb_mask_buff == NULL) {
369 free_page((unsigned long)pa_mask_buff);
370 GOTO(err_free_par_mask, rc = -ENOMEM);
373 cpumap_print_to_pagebuf(true, pa_mask_buff, par_mask);
374 pa_mask_buff[PAGE_SIZE - 1] = '\0';
375 cpumap_print_to_pagebuf(true, cb_mask_buff, all_mask);
376 cb_mask_buff[PAGE_SIZE - 1] = '\0';
378 CDEBUG(D_INFO, "%s weight=%u plist='%s' cblist='%s'\n",
379 name, cpumask_weight(par_mask),
380 pa_mask_buff, cb_mask_buff);
382 free_page((unsigned long)cb_mask_buff);
383 free_page((unsigned long)pa_mask_buff);
386 engine->pte_weight = cpumask_weight(par_mask);
387 engine->pte_pinst = padata_alloc_possible(engine->pte_wq);
388 if (engine->pte_pinst == NULL)
389 GOTO(err_free_par_mask, rc = -ENOMEM);
391 engine->pte_notifier.notifier_call = cfs_ptask_cpumask_change_notify;
392 rc = padata_register_cpumask_notifier(engine->pte_pinst,
393 &engine->pte_notifier);
395 GOTO(err_free_padata, rc);
397 rc = cfs_ptengine_set_cpumask(engine, par_mask);
399 GOTO(err_unregister, rc);
401 rc = padata_start(engine->pte_pinst);
403 GOTO(err_unregister, rc);
405 free_cpumask_var(par_mask);
406 free_cpumask_var(all_mask);
412 padata_unregister_cpumask_notifier(engine->pte_pinst,
413 &engine->pte_notifier);
415 padata_free(engine->pte_pinst);
417 free_cpumask_var(par_mask);
419 free_cpumask_var(all_mask);
420 err_destroy_workqueue:
421 destroy_workqueue(engine->pte_wq);
427 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
429 padata_stop(engine->pte_pinst);
430 padata_unregister_cpumask_notifier(engine->pte_pinst,
431 &engine->pte_notifier);
432 padata_free(engine->pte_pinst);
433 destroy_workqueue(engine->pte_wq);
436 #else /* !CONFIG_PADATA */
438 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
440 const struct cpumask *cpumask)
442 engine->pte_weight = 1;
447 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
450 #endif /* CONFIG_PADATA */
452 struct cfs_ptask_engine *cfs_ptengine_init(const char *name,
453 const struct cpumask *cpumask)
455 struct cfs_ptask_engine *engine;
458 engine = kzalloc(sizeof(*engine), GFP_KERNEL);
460 GOTO(err, rc = -ENOMEM);
462 rc = cfs_ptengine_padata_init(engine, name, cpumask);
464 GOTO(err_free_engine, rc);
473 EXPORT_SYMBOL(cfs_ptengine_init);
475 void cfs_ptengine_fini(struct cfs_ptask_engine *engine)
477 if (IS_ERR_OR_NULL(engine))
480 cfs_ptengine_padata_fini(engine);
483 EXPORT_SYMBOL(cfs_ptengine_fini);