Whamcloud - gitweb
LU-8964 libcfs: Introduce parallel tasks framework
[fs/lustre-release.git] / libcfs / libcfs / libcfs_ptask.c
1 #include <linux/errno.h>
2 #include <linux/kernel.h>
3 #include <linux/module.h>
4 #include <linux/cpumask.h>
5 #include <linux/cpu.h>
6 #include <linux/slab.h>
7 #include <linux/sched.h>
8 #include <linux/moduleparam.h>
9 #include <linux/mmu_context.h>
10
11 #define DEBUG_SUBSYSTEM S_UNDEFINED
12
13 #include <libcfs/libcfs.h>
14 #include <libcfs/libcfs_ptask.h>
15
16 /**
17  * This API based on Linux kernel padada API which is used to perform
18  * encryption and decryption on large numbers of packets without
19  * reordering those packets.
20  *
21  * It was adopted for general use in Lustre for parallelization of
22  * various functionality.
23  *
24  * The first step in using it is to set up a cfs_ptask structure to
25  * control of how this task are to be run:
26  *
27  * #include <libcfs/libcfs_ptask.h>
28  *
29  * int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
30  *                    void *cbdata, unsigned int flags, int cpu);
31  *
32  * The cbfunc function with cbdata argument will be called in the process
33  * of getting the task done. The cpu specifies which CPU will be used for
34  * the final callback when the task is done.
35  *
36  * The submission of task is done with:
37  *
38  * int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine);
39  *
40  * The task is submitted to the engine for execution.
41  *
42  * In order to wait for result of task execution you should call:
43  *
44  * int cfs_ptask_wait_for(struct cfs_ptask *ptask);
45  *
46  * The tasks with flag PTF_ORDERED are executed in parallel but complete
47  * into submission order. So, waiting for last ordered task you can be sure
48  * that all previous tasks were done before this task complete.
49  */
50
51 #ifndef HAVE_REINIT_COMPLETION
52 /**
53  * reinit_completion - reinitialize a completion structure
54  * @x:  pointer to completion structure that is to be reinitialized
55  *
56  * This inline function should be used to reinitialize a completion
57  * structure so it can be reused. This is especially important after
58  * complete_all() is used.
59  */
60 static inline void reinit_completion(struct completion *x)
61 {
62         x->done = 0;
63 }
64 #endif
65
66 #ifndef HAVE_CPUMASK_PRINT_TO_PAGEBUF
67 static inline void cpumap_print_to_pagebuf(bool unused, char *buf,
68                                            const struct cpumask *mask)
69 {
70         cpulist_scnprintf(buf, PAGE_SIZE, mask);
71 }
72 #endif
73
74 #ifdef CONFIG_PADATA
75 static void cfs_ptask_complete(struct padata_priv *padata)
76 {
77         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
78
79         if (cfs_ptask_need_complete(ptask)) {
80                 if (cfs_ptask_is_ordered(ptask))
81                         complete(&ptask->pt_completion);
82         } else if (cfs_ptask_is_autofree(ptask)) {
83                 kfree(ptask);
84         }
85 }
86
87 static void cfs_ptask_execute(struct padata_priv *padata)
88 {
89         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
90         mm_segment_t old_fs = get_fs();
91         bool bh_enabled = false;
92
93         if (!cfs_ptask_is_atomic(ptask)) {
94                 local_bh_enable();
95                 bh_enabled = true;
96         }
97
98         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
99                 use_mm(ptask->pt_mm);
100                 set_fs(ptask->pt_fs);
101         }
102
103         if (ptask->pt_cbfunc != NULL)
104                 ptask->pt_result = ptask->pt_cbfunc(ptask);
105         else
106                 ptask->pt_result = -ENOSYS;
107
108         if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
109                 set_fs(old_fs);
110                 unuse_mm(ptask->pt_mm);
111                 mmput(ptask->pt_mm);
112                 ptask->pt_mm = NULL;
113         }
114
115         if (cfs_ptask_need_complete(ptask) && !cfs_ptask_is_ordered(ptask))
116                 complete(&ptask->pt_completion);
117
118         if (bh_enabled)
119                 local_bh_disable();
120
121         padata_do_serial(padata);
122 }
123
124 static int cfs_do_parallel(struct cfs_ptask_engine *engine,
125                            struct padata_priv *padata)
126 {
127         struct cfs_ptask *ptask = cfs_padata2ptask(padata);
128         int rc;
129
130         if (cfs_ptask_need_complete(ptask))
131                 reinit_completion(&ptask->pt_completion);
132
133         if (cfs_ptask_use_user_mm(ptask)) {
134                 ptask->pt_mm = get_task_mm(current);
135                 ptask->pt_fs = get_fs();
136         }
137         ptask->pt_result = -EINPROGRESS;
138
139 retry:
140         rc = padata_do_parallel(engine->pte_pinst, padata, ptask->pt_cbcpu);
141         if (rc == -EBUSY && cfs_ptask_is_retry(ptask)) {
142                 /* too many tasks already in queue */
143                 schedule_timeout_uninterruptible(1);
144                 goto retry;
145         }
146
147         if (rc) {
148                 if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
149                         mmput(ptask->pt_mm);
150                         ptask->pt_mm = NULL;
151                 }
152                 ptask->pt_result = rc;
153         }
154
155         return rc;
156 }
157
158 /**
159  * This function submit initialized task for async execution
160  * in engine with specified id.
161  */
162 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
163 {
164         struct padata_priv *padata = cfs_ptask2padata(ptask);
165
166         if (IS_ERR_OR_NULL(engine))
167                 return -EINVAL;
168
169         memset(padata, 0, sizeof(*padata));
170
171         padata->parallel = cfs_ptask_execute;
172         padata->serial   = cfs_ptask_complete;
173
174         return cfs_do_parallel(engine, padata);
175 }
176
177 #else  /* !CONFIG_PADATA */
178
179 /**
180  * If CONFIG_PADATA is not defined this function just execute
181  * the initialized task in current thread. (emulate async execution)
182  */
183 int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
184 {
185         if (IS_ERR_OR_NULL(engine))
186                 return -EINVAL;
187
188         if (ptask->pt_cbfunc != NULL)
189                 ptask->pt_result = ptask->pt_cbfunc(ptask);
190         else
191                 ptask->pt_result = -ENOSYS;
192
193         if (cfs_ptask_need_complete(ptask))
194                 complete(&ptask->pt_completion);
195         else if (cfs_ptask_is_autofree(ptask))
196                 kfree(ptask);
197
198         return 0;
199 }
200 #endif /* CONFIG_PADATA */
201
202 EXPORT_SYMBOL(cfs_ptask_submit);
203
204 /**
205  * This function waits when task complete async execution.
206  * The tasks with flag PTF_ORDERED are executed in parallel but completes
207  * into submission order. So, waiting for last ordered task you can be sure
208  * that all previous tasks were done before this task complete.
209  */
210 int cfs_ptask_wait_for(struct cfs_ptask *ptask)
211 {
212         if (!cfs_ptask_need_complete(ptask))
213                 return -EINVAL;
214
215         wait_for_completion(&ptask->pt_completion);
216
217         return 0;
218 }
219 EXPORT_SYMBOL(cfs_ptask_wait_for);
220
221 /**
222  * This function initialize internal members of task and prepare it for
223  * async execution.
224  */
225 int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc, void *cbdata,
226                    unsigned int flags, int cpu)
227 {
228         memset(ptask, 0, sizeof(*ptask));
229
230         ptask->pt_flags  = flags;
231         ptask->pt_cbcpu  = cpu;
232         ptask->pt_mm     = NULL; /* will be set in cfs_do_parallel() */
233         ptask->pt_fs     = get_fs();
234         ptask->pt_cbfunc = cbfunc;
235         ptask->pt_cbdata = cbdata;
236         ptask->pt_result = -EAGAIN;
237
238         if (cfs_ptask_need_complete(ptask)) {
239                 if (cfs_ptask_is_autofree(ptask))
240                         return -EINVAL;
241
242                 init_completion(&ptask->pt_completion);
243         }
244
245         if (cfs_ptask_is_atomic(ptask) && cfs_ptask_use_user_mm(ptask))
246                 return -EINVAL;
247
248         return 0;
249 }
250 EXPORT_SYMBOL(cfs_ptask_init);
251
252 /**
253  * This function set the mask of allowed CPUs for parallel execution
254  * for engine with specified id.
255  */
256 int cfs_ptengine_set_cpumask(struct cfs_ptask_engine *engine,
257                              const struct cpumask *cpumask)
258 {
259         int rc = 0;
260
261 #ifdef CONFIG_PADATA
262         cpumask_var_t serial_mask;
263         cpumask_var_t parallel_mask;
264
265         if (IS_ERR_OR_NULL(engine))
266                 return -EINVAL;
267
268         if (!alloc_cpumask_var(&serial_mask, GFP_KERNEL))
269                 return -ENOMEM;
270
271         if (!alloc_cpumask_var(&parallel_mask, GFP_KERNEL)) {
272                 free_cpumask_var(serial_mask);
273                 return -ENOMEM;
274         }
275
276         cpumask_copy(parallel_mask, cpumask);
277         cpumask_copy(serial_mask, cpu_online_mask);
278
279         rc = padata_set_cpumasks(engine->pte_pinst, parallel_mask, serial_mask);
280
281         free_cpumask_var(parallel_mask);
282         free_cpumask_var(serial_mask);
283 #endif /* CONFIG_PADATA */
284
285         return rc;
286 }
287 EXPORT_SYMBOL(cfs_ptengine_set_cpumask);
288
289 /**
290  * This function returns the count of allowed CPUs for parallel execution
291  * for engine with specified id.
292  */
293 int cfs_ptengine_weight(struct cfs_ptask_engine *engine)
294 {
295         if (IS_ERR_OR_NULL(engine))
296                 return -EINVAL;
297
298         return engine->pte_weight;
299 }
300 EXPORT_SYMBOL(cfs_ptengine_weight);
301
302 #ifdef CONFIG_PADATA
303 static int cfs_ptask_cpumask_change_notify(struct notifier_block *self,
304                                            unsigned long val, void *data)
305 {
306         struct padata_cpumask *padata_cpumask = data;
307         struct cfs_ptask_engine *engine;
308
309         engine = container_of(self, struct cfs_ptask_engine, pte_notifier);
310
311         if (val & PADATA_CPU_PARALLEL)
312                 engine->pte_weight = cpumask_weight(padata_cpumask->pcpu);
313
314         return 0;
315 }
316
317 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
318                                     const char *name,
319                                     const struct cpumask *cpumask)
320 {
321         cpumask_var_t all_mask;
322         cpumask_var_t par_mask;
323         unsigned int wq_flags = WQ_MEM_RECLAIM | WQ_CPU_INTENSIVE;
324         int rc;
325
326         get_online_cpus();
327
328         engine->pte_wq = alloc_workqueue(name, wq_flags, 1);
329         if (engine->pte_wq == NULL)
330                 GOTO(err, rc = -ENOMEM);
331
332         if (!alloc_cpumask_var(&all_mask, GFP_KERNEL))
333                 GOTO(err_destroy_workqueue, rc = -ENOMEM);
334
335         if (!alloc_cpumask_var(&par_mask, GFP_KERNEL))
336                 GOTO(err_free_all_mask, rc = -ENOMEM);
337
338         cpumask_copy(par_mask, cpumask);
339         if (cpumask_empty(par_mask) ||
340             cpumask_equal(par_mask, cpu_online_mask)) {
341                 cpumask_copy(all_mask, cpu_online_mask);
342                 cpumask_clear(par_mask);
343                 while (!cpumask_empty(all_mask)) {
344                         int cpu = cpumask_first(all_mask);
345
346                         cpumask_set_cpu(cpu, par_mask);
347                         cpumask_andnot(all_mask, all_mask,
348                                         topology_sibling_cpumask(cpu));
349                 }
350         }
351
352         cpumask_copy(all_mask, cpu_online_mask);
353
354         {
355                 char *pa_mask_buff, *cb_mask_buff;
356
357                 pa_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
358                 if (pa_mask_buff == NULL)
359                         GOTO(err_free_par_mask, rc = -ENOMEM);
360
361                 cb_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
362                 if (cb_mask_buff == NULL) {
363                         free_page((unsigned long)pa_mask_buff);
364                         GOTO(err_free_par_mask, rc = -ENOMEM);
365                 }
366
367                 cpumap_print_to_pagebuf(true, pa_mask_buff, par_mask);
368                 pa_mask_buff[PAGE_SIZE - 1] = '\0';
369                 cpumap_print_to_pagebuf(true, cb_mask_buff, all_mask);
370                 cb_mask_buff[PAGE_SIZE - 1] = '\0';
371
372                 CDEBUG(D_INFO, "%s weight=%u plist='%s' cblist='%s'\n",
373                         name, cpumask_weight(par_mask),
374                         pa_mask_buff, cb_mask_buff);
375
376                 free_page((unsigned long)cb_mask_buff);
377                 free_page((unsigned long)pa_mask_buff);
378         }
379
380         engine->pte_weight = cpumask_weight(par_mask);
381         engine->pte_pinst  = padata_alloc(engine->pte_wq, par_mask, all_mask);
382         if (engine->pte_pinst == NULL)
383                 GOTO(err_free_par_mask, rc = -ENOMEM);
384
385         engine->pte_notifier.notifier_call = cfs_ptask_cpumask_change_notify;
386         rc = padata_register_cpumask_notifier(engine->pte_pinst,
387                                               &engine->pte_notifier);
388         if (rc)
389                 GOTO(err_free_padata, rc);
390
391         rc = padata_start(engine->pte_pinst);
392         if (rc)
393                 GOTO(err_unregister, rc);
394
395         free_cpumask_var(par_mask);
396         free_cpumask_var(all_mask);
397
398         put_online_cpus();
399         return 0;
400
401 err_unregister:
402         padata_unregister_cpumask_notifier(engine->pte_pinst,
403                                            &engine->pte_notifier);
404 err_free_padata:
405         padata_free(engine->pte_pinst);
406 err_free_par_mask:
407         free_cpumask_var(par_mask);
408 err_free_all_mask:
409         free_cpumask_var(all_mask);
410 err_destroy_workqueue:
411         destroy_workqueue(engine->pte_wq);
412 err:
413         put_online_cpus();
414         return rc;
415 }
416
417 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
418 {
419         padata_stop(engine->pte_pinst);
420         padata_unregister_cpumask_notifier(engine->pte_pinst,
421                                            &engine->pte_notifier);
422         padata_free(engine->pte_pinst);
423         destroy_workqueue(engine->pte_wq);
424 }
425
426 #else  /* !CONFIG_PADATA */
427
428 static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
429                                     const char *name,
430                                     const struct cpumask *cpumask)
431 {
432         engine->pte_weight = 1;
433
434         return 0;
435 }
436
437 static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
438 {
439 }
440 #endif /* CONFIG_PADATA */
441
442 struct cfs_ptask_engine *cfs_ptengine_init(const char *name,
443                                            const struct cpumask *cpumask)
444 {
445         struct cfs_ptask_engine *engine;
446         int rc;
447
448         engine = kzalloc(sizeof(*engine), GFP_KERNEL);
449         if (engine == NULL)
450                 GOTO(err, rc = -ENOMEM);
451
452         rc = cfs_ptengine_padata_init(engine, name, cpumask);
453         if (rc)
454                 GOTO(err_free_engine, rc);
455
456         return engine;
457
458 err_free_engine:
459         kfree(engine);
460 err:
461         return ERR_PTR(rc);
462 }
463 EXPORT_SYMBOL(cfs_ptengine_init);
464
465 void cfs_ptengine_fini(struct cfs_ptask_engine *engine)
466 {
467         if (IS_ERR_OR_NULL(engine))
468                 return;
469
470         cfs_ptengine_padata_fini(engine);
471         kfree(engine);
472 }
473 EXPORT_SYMBOL(cfs_ptengine_fini);