Whamcloud - gitweb
LU-8964 libcfs: Introduce parallel tasks framework 74/24474/26
authorDmitry Eremin <dmitry.eremin@intel.com>
Tue, 20 Dec 2016 09:10:05 +0000 (12:10 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 30 May 2017 13:28:08 +0000 (13:28 +0000)
In this patch new API for parallel tasks execution is introduced.
This API based on Linux kernel padata API which is used to perform
encryption and decryption on large numbers of packets without
reordering those packets.

It was adopted for general use in Lustre for parallelization of
various functionality. The first place of its usage is parallel I/O
implementation.

The first step in using it is to set up a cfs_ptask structure to
control of how this task are to be run:

    #include <libcfs/libcfs_ptask.h>

    int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
                       void *cbdata, unsigned int flags, int cpu);

The cbfunc function with cbdata argument will be called in the process
of getting the task done. The cpu specifies which CPU will be used for
the final callback when the task is done.

The submission of task is done with:

    int cfs_ptask_submit(struct cfs_ptask *ptask,
                         struct cfs_ptask_engine *engine);

The task is submitted to the engine for execution.

In order to wait for result of task execution you should call:

    int cfs_ptask_wait_for(struct cfs_ptask *ptask);

The tasks with flag PTF_ORDERED are executed in parallel but complete
into submission order. So, waiting for last ordered task you can be sure
that all previous tasks were done before this task complete.

Test-Parameters: trivial
Change-Id: I36d9802226f9b6fae627594a86ad1a83f7bc6b0c
Signed-off-by: Dmitry Eremin <dmitry.eremin@intel.com>
Reviewed-on: https://review.whamcloud.com/24474
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
libcfs/autoconf/lustre-libcfs.m4
libcfs/include/libcfs/Makefile.am
libcfs/include/libcfs/libcfs_ptask.h [new file with mode: 0644]
libcfs/libcfs/Makefile.in
libcfs/libcfs/libcfs_ptask.c [new file with mode: 0644]

index fb93f0e..f81b92a 100644 (file)
@@ -85,6 +85,22 @@ stacktrace_ops_warning, [
 ]) # LIBCFS_STACKTRACE_WARNING
 
 #
+# LIBCFS_REINIT_COMPLETION
+#
+AC_DEFUN([LIBCFS_REINIT_COMPLETION], [
+LB_CHECK_COMPILE([if 'reinit_completion' exists],
+reinit_completion, [
+       #include <linux/completion.h>
+],[
+       struct completion x;
+       reinit_completion(&x);
+],[
+       AC_DEFINE(HAVE_REINIT_COMPLETION, 1,
+               [reinit_completion is exist])
+])
+]) # LIBCFS_REINIT_COMPLETION
+
+#
 # LC_SHRINKER_WANT_SHRINK_PTR
 #
 # RHEL6/2.6.32 want to have pointer to shrinker self pointer in handler function
@@ -487,6 +503,27 @@ ktime_get_seconds, [
 ]) # LIBCFS_KTIME_GET_SECONDS
 
 #
+# Kernel version 3.19 commit 5aaba36318e5995e8c95d077a46d9a4d00fcc1cd
+# This patch creates a new helper function cpumap_print_to_pagebuf in
+# cpumask.h using newly added bitmap_print_to_pagebuf and consolidates
+# most of those sysfs functions using the new helper function.
+#
+AC_DEFUN([LIBCFS_HAVE_CPUMASK_PRINT_TO_PAGEBUF],[
+LB_CHECK_COMPILE([does function 'cpumap_print_to_pagebuf' exist],
+cpumap_print_to_pagebuf, [
+       #include <linux/topology.h>
+],[
+       int n;
+       char *buf = NULL;
+       const struct cpumask *mask = NULL;
+       n = cpumap_print_to_pagebuf(true, buf, mask);
+],[
+       AC_DEFINE(HAVE_CPUMASK_PRINT_TO_PAGEBUF, 1,
+               [cpumap_print_to_pagebuf is available])
+])
+]) # LIBCFS_HAVE_CPUMASK_PRINT_TO_PAGEBUF
+
+#
 # Kernel version 4.2 changed topology_thread_cpumask
 # to topology_sibling_cpumask
 #
@@ -629,6 +666,7 @@ LIBCFS_DUMP_TRACE_ADDRESS
 LC_SHRINK_CONTROL
 # 3.0
 LIBCFS_STACKTRACE_WARNING
+LIBCFS_REINIT_COMPLETION
 # 3.5
 LIBCFS_PROCESS_NAMESPACE
 LIBCFS_I_UID_READ
@@ -656,6 +694,7 @@ LIBCFS_TIMESPEC64_SUB
 LIBCFS_TIMESPEC64_TO_KTIME
 # 3.19
 LIBCFS_KTIME_GET_SECONDS
+LIBCFS_HAVE_CPUMASK_PRINT_TO_PAGEBUF
 # 4.2
 LIBCFS_HAVE_TOPOLOGY_SIBLING_CPUMASK
 LIBCFS_FPU_API
index a9f6866..1ba83ea 100644 (file)
@@ -18,4 +18,5 @@ EXTRA_DIST = \
        libcfs_private.h \
        libcfs_string.h \
        libcfs_time.h \
-       libcfs_workitem.h
+       libcfs_workitem.h \
+       libcfs_ptask.h
diff --git a/libcfs/include/libcfs/libcfs_ptask.h b/libcfs/include/libcfs/libcfs_ptask.h
new file mode 100644 (file)
index 0000000..be78b50
--- /dev/null
@@ -0,0 +1,112 @@
+#ifndef __LIBCFS_PTASK_H__
+#define __LIBCFS_PTASK_H__
+
+#include <linux/types.h>
+#include <linux/bitops.h>
+#include <linux/kernel.h>
+#include <linux/cpumask.h>
+#include <linux/uaccess.h>
+#include <linux/notifier.h>
+#include <linux/workqueue.h>
+#include <linux/completion.h>
+#ifdef CONFIG_PADATA
+#include <linux/padata.h>
+#else
+struct padata_priv {};
+struct padata_instance {};
+#endif
+
+#define PTF_COMPLETE   BIT(0)
+#define PTF_AUTOFREE   BIT(1)
+#define PTF_ORDERED    BIT(2)
+#define PTF_USER_MM    BIT(3)
+#define PTF_ATOMIC     BIT(4)
+#define PTF_RETRY      BIT(5)
+
+struct cfs_ptask_engine {
+       struct padata_instance  *pte_pinst;
+       struct workqueue_struct *pte_wq;
+       struct notifier_block    pte_notifier;
+       int                      pte_weight;
+};
+
+struct cfs_ptask;
+typedef int (*cfs_ptask_cb_t)(struct cfs_ptask *);
+
+struct cfs_ptask {
+       struct padata_priv       pt_padata;
+       struct completion        pt_completion;
+       mm_segment_t             pt_fs;
+       struct mm_struct        *pt_mm;
+       unsigned int             pt_flags;
+       int                      pt_cbcpu;
+       cfs_ptask_cb_t           pt_cbfunc;
+       void                    *pt_cbdata;
+       int                      pt_result;
+};
+
+static inline
+struct padata_priv *cfs_ptask2padata(struct cfs_ptask *ptask)
+{
+       return &ptask->pt_padata;
+}
+
+static inline
+struct cfs_ptask *cfs_padata2ptask(struct padata_priv *padata)
+{
+       return container_of(padata, struct cfs_ptask, pt_padata);
+}
+
+static inline
+bool cfs_ptask_need_complete(struct cfs_ptask *ptask)
+{
+       return ptask->pt_flags & PTF_COMPLETE;
+}
+
+static inline
+bool cfs_ptask_is_autofree(struct cfs_ptask *ptask)
+{
+       return ptask->pt_flags & PTF_AUTOFREE;
+}
+
+static inline
+bool cfs_ptask_is_ordered(struct cfs_ptask *ptask)
+{
+       return ptask->pt_flags & PTF_ORDERED;
+}
+
+static inline
+bool cfs_ptask_use_user_mm(struct cfs_ptask *ptask)
+{
+       return ptask->pt_flags & PTF_USER_MM;
+}
+
+static inline
+bool cfs_ptask_is_atomic(struct cfs_ptask *ptask)
+{
+       return ptask->pt_flags & PTF_ATOMIC;
+}
+
+static inline
+bool cfs_ptask_is_retry(struct cfs_ptask *ptask)
+{
+       return ptask->pt_flags & PTF_RETRY;
+}
+
+static inline
+int cfs_ptask_result(struct cfs_ptask *ptask)
+{
+       return ptask->pt_result;
+}
+
+struct cfs_ptask_engine *cfs_ptengine_init(const char *, const struct cpumask *);
+void cfs_ptengine_fini(struct cfs_ptask_engine *);
+int  cfs_ptengine_set_cpumask(struct cfs_ptask_engine *, const struct cpumask *);
+int  cfs_ptengine_weight(struct cfs_ptask_engine *);
+
+int  cfs_ptask_submit(struct cfs_ptask *, struct cfs_ptask_engine *);
+int  cfs_ptask_wait_for(struct cfs_ptask *);
+int  cfs_ptask_init(struct cfs_ptask *, cfs_ptask_cb_t, void *,
+                   unsigned int, int);
+
+#endif /* __LIBCFS_PTASK_H__ */
index 04cd94d..b559cbc 100644 (file)
@@ -16,7 +16,8 @@ libcfs-linux-objs := $(addprefix linux/,$(libcfs-linux-objs))
 libcfs-all-objs := debug.o fail.o module.o tracefile.o watchdog.o \
                   libcfs_string.o hash.o \
                   prng.o workitem.o libcfs_cpu.o \
-                  libcfs_mem.o libcfs_lock.o heap.o
+                  libcfs_mem.o libcfs_lock.o heap.o \
+                  libcfs_ptask.o
 
 libcfs-objs := $(libcfs-linux-objs) $(libcfs-all-objs)
 
diff --git a/libcfs/libcfs/libcfs_ptask.c b/libcfs/libcfs/libcfs_ptask.c
new file mode 100644 (file)
index 0000000..0548dae
--- /dev/null
@@ -0,0 +1,473 @@
+#include <linux/errno.h>
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/cpumask.h>
+#include <linux/cpu.h>
+#include <linux/slab.h>
+#include <linux/sched.h>
+#include <linux/moduleparam.h>
+#include <linux/mmu_context.h>
+
+#define DEBUG_SUBSYSTEM S_UNDEFINED
+
+#include <libcfs/libcfs.h>
+#include <libcfs/libcfs_ptask.h>
+
+/**
+ * This API based on Linux kernel padada API which is used to perform
+ * encryption and decryption on large numbers of packets without
+ * reordering those packets.
+ *
+ * It was adopted for general use in Lustre for parallelization of
+ * various functionality.
+ *
+ * The first step in using it is to set up a cfs_ptask structure to
+ * control of how this task are to be run:
+ *
+ * #include <libcfs/libcfs_ptask.h>
+ *
+ * int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc,
+ *                    void *cbdata, unsigned int flags, int cpu);
+ *
+ * The cbfunc function with cbdata argument will be called in the process
+ * of getting the task done. The cpu specifies which CPU will be used for
+ * the final callback when the task is done.
+ *
+ * The submission of task is done with:
+ *
+ * int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine);
+ *
+ * The task is submitted to the engine for execution.
+ *
+ * In order to wait for result of task execution you should call:
+ *
+ * int cfs_ptask_wait_for(struct cfs_ptask *ptask);
+ *
+ * The tasks with flag PTF_ORDERED are executed in parallel but complete
+ * into submission order. So, waiting for last ordered task you can be sure
+ * that all previous tasks were done before this task complete.
+ */
+
+#ifndef HAVE_REINIT_COMPLETION
+/**
+ * reinit_completion - reinitialize a completion structure
+ * @x:  pointer to completion structure that is to be reinitialized
+ *
+ * This inline function should be used to reinitialize a completion
+ * structure so it can be reused. This is especially important after
+ * complete_all() is used.
+ */
+static inline void reinit_completion(struct completion *x)
+{
+       x->done = 0;
+}
+#endif
+
+#ifndef HAVE_CPUMASK_PRINT_TO_PAGEBUF
+static inline void cpumap_print_to_pagebuf(bool unused, char *buf,
+                                          const struct cpumask *mask)
+{
+       cpulist_scnprintf(buf, PAGE_SIZE, mask);
+}
+#endif
+
+#ifdef CONFIG_PADATA
+static void cfs_ptask_complete(struct padata_priv *padata)
+{
+       struct cfs_ptask *ptask = cfs_padata2ptask(padata);
+
+       if (cfs_ptask_need_complete(ptask)) {
+               if (cfs_ptask_is_ordered(ptask))
+                       complete(&ptask->pt_completion);
+       } else if (cfs_ptask_is_autofree(ptask)) {
+               kfree(ptask);
+       }
+}
+
+static void cfs_ptask_execute(struct padata_priv *padata)
+{
+       struct cfs_ptask *ptask = cfs_padata2ptask(padata);
+       mm_segment_t old_fs = get_fs();
+       bool bh_enabled = false;
+
+       if (!cfs_ptask_is_atomic(ptask)) {
+               local_bh_enable();
+               bh_enabled = true;
+       }
+
+       if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
+               use_mm(ptask->pt_mm);
+               set_fs(ptask->pt_fs);
+       }
+
+       if (ptask->pt_cbfunc != NULL)
+               ptask->pt_result = ptask->pt_cbfunc(ptask);
+       else
+               ptask->pt_result = -ENOSYS;
+
+       if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
+               set_fs(old_fs);
+               unuse_mm(ptask->pt_mm);
+               mmput(ptask->pt_mm);
+               ptask->pt_mm = NULL;
+       }
+
+       if (cfs_ptask_need_complete(ptask) && !cfs_ptask_is_ordered(ptask))
+               complete(&ptask->pt_completion);
+
+       if (bh_enabled)
+               local_bh_disable();
+
+       padata_do_serial(padata);
+}
+
+static int cfs_do_parallel(struct cfs_ptask_engine *engine,
+                          struct padata_priv *padata)
+{
+       struct cfs_ptask *ptask = cfs_padata2ptask(padata);
+       int rc;
+
+       if (cfs_ptask_need_complete(ptask))
+               reinit_completion(&ptask->pt_completion);
+
+       if (cfs_ptask_use_user_mm(ptask)) {
+               ptask->pt_mm = get_task_mm(current);
+               ptask->pt_fs = get_fs();
+       }
+       ptask->pt_result = -EINPROGRESS;
+
+retry:
+       rc = padata_do_parallel(engine->pte_pinst, padata, ptask->pt_cbcpu);
+       if (rc == -EBUSY && cfs_ptask_is_retry(ptask)) {
+               /* too many tasks already in queue */
+               schedule_timeout_uninterruptible(1);
+               goto retry;
+       }
+
+       if (rc) {
+               if (cfs_ptask_use_user_mm(ptask) && ptask->pt_mm != NULL) {
+                       mmput(ptask->pt_mm);
+                       ptask->pt_mm = NULL;
+               }
+               ptask->pt_result = rc;
+       }
+
+       return rc;
+}
+
+/**
+ * This function submit initialized task for async execution
+ * in engine with specified id.
+ */
+int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
+{
+       struct padata_priv *padata = cfs_ptask2padata(ptask);
+
+       if (IS_ERR_OR_NULL(engine))
+               return -EINVAL;
+
+       memset(padata, 0, sizeof(*padata));
+
+       padata->parallel = cfs_ptask_execute;
+       padata->serial   = cfs_ptask_complete;
+
+       return cfs_do_parallel(engine, padata);
+}
+
+#else  /* !CONFIG_PADATA */
+
+/**
+ * If CONFIG_PADATA is not defined this function just execute
+ * the initialized task in current thread. (emulate async execution)
+ */
+int cfs_ptask_submit(struct cfs_ptask *ptask, struct cfs_ptask_engine *engine)
+{
+       if (IS_ERR_OR_NULL(engine))
+               return -EINVAL;
+
+       if (ptask->pt_cbfunc != NULL)
+               ptask->pt_result = ptask->pt_cbfunc(ptask);
+       else
+               ptask->pt_result = -ENOSYS;
+
+       if (cfs_ptask_need_complete(ptask))
+               complete(&ptask->pt_completion);
+       else if (cfs_ptask_is_autofree(ptask))
+               kfree(ptask);
+
+       return 0;
+}
+#endif /* CONFIG_PADATA */
+
+EXPORT_SYMBOL(cfs_ptask_submit);
+
+/**
+ * This function waits when task complete async execution.
+ * The tasks with flag PTF_ORDERED are executed in parallel but completes
+ * into submission order. So, waiting for last ordered task you can be sure
+ * that all previous tasks were done before this task complete.
+ */
+int cfs_ptask_wait_for(struct cfs_ptask *ptask)
+{
+       if (!cfs_ptask_need_complete(ptask))
+               return -EINVAL;
+
+       wait_for_completion(&ptask->pt_completion);
+
+       return 0;
+}
+EXPORT_SYMBOL(cfs_ptask_wait_for);
+
+/**
+ * This function initialize internal members of task and prepare it for
+ * async execution.
+ */
+int cfs_ptask_init(struct cfs_ptask *ptask, cfs_ptask_cb_t cbfunc, void *cbdata,
+                  unsigned int flags, int cpu)
+{
+       memset(ptask, 0, sizeof(*ptask));
+
+       ptask->pt_flags  = flags;
+       ptask->pt_cbcpu  = cpu;
+       ptask->pt_mm     = NULL; /* will be set in cfs_do_parallel() */
+       ptask->pt_fs     = get_fs();
+       ptask->pt_cbfunc = cbfunc;
+       ptask->pt_cbdata = cbdata;
+       ptask->pt_result = -EAGAIN;
+
+       if (cfs_ptask_need_complete(ptask)) {
+               if (cfs_ptask_is_autofree(ptask))
+                       return -EINVAL;
+
+               init_completion(&ptask->pt_completion);
+       }
+
+       if (cfs_ptask_is_atomic(ptask) && cfs_ptask_use_user_mm(ptask))
+               return -EINVAL;
+
+       return 0;
+}
+EXPORT_SYMBOL(cfs_ptask_init);
+
+/**
+ * This function set the mask of allowed CPUs for parallel execution
+ * for engine with specified id.
+ */
+int cfs_ptengine_set_cpumask(struct cfs_ptask_engine *engine,
+                            const struct cpumask *cpumask)
+{
+       int rc = 0;
+
+#ifdef CONFIG_PADATA
+       cpumask_var_t serial_mask;
+       cpumask_var_t parallel_mask;
+
+       if (IS_ERR_OR_NULL(engine))
+               return -EINVAL;
+
+       if (!alloc_cpumask_var(&serial_mask, GFP_KERNEL))
+               return -ENOMEM;
+
+       if (!alloc_cpumask_var(&parallel_mask, GFP_KERNEL)) {
+               free_cpumask_var(serial_mask);
+               return -ENOMEM;
+       }
+
+       cpumask_copy(parallel_mask, cpumask);
+       cpumask_copy(serial_mask, cpu_online_mask);
+
+       rc = padata_set_cpumasks(engine->pte_pinst, parallel_mask, serial_mask);
+
+       free_cpumask_var(parallel_mask);
+       free_cpumask_var(serial_mask);
+#endif /* CONFIG_PADATA */
+
+       return rc;
+}
+EXPORT_SYMBOL(cfs_ptengine_set_cpumask);
+
+/**
+ * This function returns the count of allowed CPUs for parallel execution
+ * for engine with specified id.
+ */
+int cfs_ptengine_weight(struct cfs_ptask_engine *engine)
+{
+       if (IS_ERR_OR_NULL(engine))
+               return -EINVAL;
+
+       return engine->pte_weight;
+}
+EXPORT_SYMBOL(cfs_ptengine_weight);
+
+#ifdef CONFIG_PADATA
+static int cfs_ptask_cpumask_change_notify(struct notifier_block *self,
+                                          unsigned long val, void *data)
+{
+       struct padata_cpumask *padata_cpumask = data;
+       struct cfs_ptask_engine *engine;
+
+       engine = container_of(self, struct cfs_ptask_engine, pte_notifier);
+
+       if (val & PADATA_CPU_PARALLEL)
+               engine->pte_weight = cpumask_weight(padata_cpumask->pcpu);
+
+       return 0;
+}
+
+static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
+                                   const char *name,
+                                   const struct cpumask *cpumask)
+{
+       cpumask_var_t all_mask;
+       cpumask_var_t par_mask;
+       unsigned int wq_flags = WQ_MEM_RECLAIM | WQ_CPU_INTENSIVE;
+       int rc;
+
+       get_online_cpus();
+
+       engine->pte_wq = alloc_workqueue(name, wq_flags, 1);
+       if (engine->pte_wq == NULL)
+               GOTO(err, rc = -ENOMEM);
+
+       if (!alloc_cpumask_var(&all_mask, GFP_KERNEL))
+               GOTO(err_destroy_workqueue, rc = -ENOMEM);
+
+       if (!alloc_cpumask_var(&par_mask, GFP_KERNEL))
+               GOTO(err_free_all_mask, rc = -ENOMEM);
+
+       cpumask_copy(par_mask, cpumask);
+       if (cpumask_empty(par_mask) ||
+           cpumask_equal(par_mask, cpu_online_mask)) {
+               cpumask_copy(all_mask, cpu_online_mask);
+               cpumask_clear(par_mask);
+               while (!cpumask_empty(all_mask)) {
+                       int cpu = cpumask_first(all_mask);
+
+                       cpumask_set_cpu(cpu, par_mask);
+                       cpumask_andnot(all_mask, all_mask,
+                                       topology_sibling_cpumask(cpu));
+               }
+       }
+
+       cpumask_copy(all_mask, cpu_online_mask);
+
+       {
+               char *pa_mask_buff, *cb_mask_buff;
+
+               pa_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
+               if (pa_mask_buff == NULL)
+                       GOTO(err_free_par_mask, rc = -ENOMEM);
+
+               cb_mask_buff = (char *)__get_free_page(GFP_TEMPORARY);
+               if (cb_mask_buff == NULL) {
+                       free_page((unsigned long)pa_mask_buff);
+                       GOTO(err_free_par_mask, rc = -ENOMEM);
+               }
+
+               cpumap_print_to_pagebuf(true, pa_mask_buff, par_mask);
+               pa_mask_buff[PAGE_SIZE - 1] = '\0';
+               cpumap_print_to_pagebuf(true, cb_mask_buff, all_mask);
+               cb_mask_buff[PAGE_SIZE - 1] = '\0';
+
+               CDEBUG(D_INFO, "%s weight=%u plist='%s' cblist='%s'\n",
+                       name, cpumask_weight(par_mask),
+                       pa_mask_buff, cb_mask_buff);
+
+               free_page((unsigned long)cb_mask_buff);
+               free_page((unsigned long)pa_mask_buff);
+       }
+
+       engine->pte_weight = cpumask_weight(par_mask);
+       engine->pte_pinst  = padata_alloc(engine->pte_wq, par_mask, all_mask);
+       if (engine->pte_pinst == NULL)
+               GOTO(err_free_par_mask, rc = -ENOMEM);
+
+       engine->pte_notifier.notifier_call = cfs_ptask_cpumask_change_notify;
+       rc = padata_register_cpumask_notifier(engine->pte_pinst,
+                                             &engine->pte_notifier);
+       if (rc)
+               GOTO(err_free_padata, rc);
+
+       rc = padata_start(engine->pte_pinst);
+       if (rc)
+               GOTO(err_unregister, rc);
+
+       free_cpumask_var(par_mask);
+       free_cpumask_var(all_mask);
+
+       put_online_cpus();
+       return 0;
+
+err_unregister:
+       padata_unregister_cpumask_notifier(engine->pte_pinst,
+                                          &engine->pte_notifier);
+err_free_padata:
+       padata_free(engine->pte_pinst);
+err_free_par_mask:
+       free_cpumask_var(par_mask);
+err_free_all_mask:
+       free_cpumask_var(all_mask);
+err_destroy_workqueue:
+       destroy_workqueue(engine->pte_wq);
+err:
+       put_online_cpus();
+       return rc;
+}
+
+static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
+{
+       padata_stop(engine->pte_pinst);
+       padata_unregister_cpumask_notifier(engine->pte_pinst,
+                                          &engine->pte_notifier);
+       padata_free(engine->pte_pinst);
+       destroy_workqueue(engine->pte_wq);
+}
+
+#else  /* !CONFIG_PADATA */
+
+static int cfs_ptengine_padata_init(struct cfs_ptask_engine *engine,
+                                   const char *name,
+                                   const struct cpumask *cpumask)
+{
+       engine->pte_weight = 1;
+
+       return 0;
+}
+
+static void cfs_ptengine_padata_fini(struct cfs_ptask_engine *engine)
+{
+}
+#endif /* CONFIG_PADATA */
+
+struct cfs_ptask_engine *cfs_ptengine_init(const char *name,
+                                          const struct cpumask *cpumask)
+{
+       struct cfs_ptask_engine *engine;
+       int rc;
+
+       engine = kzalloc(sizeof(*engine), GFP_KERNEL);
+       if (engine == NULL)
+               GOTO(err, rc = -ENOMEM);
+
+       rc = cfs_ptengine_padata_init(engine, name, cpumask);
+       if (rc)
+               GOTO(err_free_engine, rc);
+
+       return engine;
+
+err_free_engine:
+       kfree(engine);
+err:
+       return ERR_PTR(rc);
+}
+EXPORT_SYMBOL(cfs_ptengine_init);
+
+void cfs_ptengine_fini(struct cfs_ptask_engine *engine)
+{
+       if (IS_ERR_OR_NULL(engine))
+               return;
+
+       cfs_ptengine_padata_fini(engine);
+       kfree(engine);
+}
+EXPORT_SYMBOL(cfs_ptengine_fini);