From 24eeeeef5d56f05b19c94fdd69a5a22ab7384616 Mon Sep 17 00:00:00 2001 From: Mr NeilBrown Date: Wed, 23 Oct 2019 11:30:51 +1100 Subject: [PATCH] LU-12780 quota: don't use ptlrpc_thread for qsd_reint_main() Instead of ptlrpc_thread, use native kthread operations. - perform allocation before starting the thread, and don't bother with synchronization at thread start - use kthread_stop() and kthread_should_stop() to signal shutdown - as the thread can stop independently, use xchg() on qqi->qqi_reint_task to determine if there is a waiter, and if so, the thread should synchronize with the waiter. - use wake_up_var and wait_var_event for event signalling. Also use kthread's thread-name formatting rather than allocating a 'name' for formating the name separately. Signed-off-by: Mr NeilBrown Change-Id: Ia1334a3e865b67ba5ae78c7b8342704a340229f7 Signed-off-by: Mr NeilBrown Reviewed-on: https://review.whamcloud.com/36268 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre/quota/qsd_internal.h | 2 +- lustre/quota/qsd_lib.c | 24 ++++++++-- lustre/quota/qsd_reint.c | 109 +++++++++++++++++++++++--------------------- 3 files changed, 79 insertions(+), 56 deletions(-) diff --git a/lustre/quota/qsd_internal.h b/lustre/quota/qsd_internal.h index 18fc0c6..d0654c8 100644 --- a/lustre/quota/qsd_internal.h +++ b/lustre/quota/qsd_internal.h @@ -155,7 +155,7 @@ struct qsd_qtype_info { struct lquota_site *qqi_site; /* Reintegration thread */ - struct ptlrpc_thread qqi_reint_thread; + struct task_struct *qqi_reint_task; /* statistics on operations performed by this slave */ struct lprocfs_stats *qqi_stats; diff --git a/lustre/quota/qsd_lib.c b/lustre/quota/qsd_lib.c index f50ca84..a5a5bc4 100644 --- a/lustre/quota/qsd_lib.c +++ b/lustre/quota/qsd_lib.c @@ -312,7 +312,16 @@ static int qsd_conn_callback(void *data) * step 3) will have to wait for qsd_start() to be called */ for (type = USRQUOTA; type < LL_MAXQUOTAS; type++) { struct qsd_qtype_info *qqi = qsd->qsd_type_array[type]; - wake_up(&qqi->qqi_reint_thread.t_ctl_waitq); + struct task_struct *t; + + /* qqi_reint_task can be set to NULL at any time, + * so we need to be careful. + */ + rcu_read_lock(); + t = rcu_dereference(qqi->qqi_reint_task); + if (t) + wake_up_process(t); + rcu_read_unlock(); } RETURN(0); @@ -472,8 +481,6 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd, qqi->qqi_glb_uptodate = false; qqi->qqi_slv_uptodate = false; qqi->qqi_reint = false; - init_waitqueue_head(&qqi->qqi_reint_thread.t_ctl_waitq); - thread_set_flags(&qqi->qqi_reint_thread, SVC_STOPPED); INIT_LIST_HEAD(&qqi->qqi_deferred_glb); INIT_LIST_HEAD(&qqi->qqi_deferred_slv); lquota_generate_fid(&qqi->qqi_fid, QSD_RES_TYPE(qsd), qtype); @@ -889,7 +896,16 @@ int qsd_start(const struct lu_env *env, struct qsd_instance *qsd) * up to usage; If usage < granted, release down to usage. */ for (type = USRQUOTA; type < LL_MAXQUOTAS; type++) { struct qsd_qtype_info *qqi = qsd->qsd_type_array[type]; - wake_up(&qqi->qqi_reint_thread.t_ctl_waitq); + struct task_struct *t; + + /* qqi_reint_task can be set to NULL at any time, + * so we need to be careful. + */ + rcu_read_lock(); + t = rcu_dereference(qqi->qqi_reint_task); + if (t) + wake_up_process(t); + rcu_read_unlock(); } RETURN(rc); diff --git a/lustre/quota/qsd_reint.c b/lustre/quota/qsd_reint.c index 4e5a4ad..5eac63f 100644 --- a/lustre/quota/qsd_reint.c +++ b/lustre/quota/qsd_reint.c @@ -409,16 +409,26 @@ static int qsd_started(struct qsd_instance *qsd) return started; } +struct qsd_reint_args { + struct qsd_qtype_info *qra_qqi; + struct lu_env qra_env; + struct completion *qra_started; +}; + +#ifndef TASK_IDLE +#define TASK_IDLE TASK_INTERRUPTIBLE +#endif + /* * Routine executed by the reintegration thread. */ -static int qsd_reint_main(void *args) +static int qsd_reint_main(void *_args) { - struct lu_env *env; + struct qsd_reint_args *args = _args; + struct lu_env *env = &args->qra_env; struct qsd_thread_info *qti; - struct qsd_qtype_info *qqi = (struct qsd_qtype_info *)args; + struct qsd_qtype_info *qqi = args->qra_qqi; struct qsd_instance *qsd = qqi->qqi_qsd; - struct ptlrpc_thread *thread = &qqi->qqi_reint_thread; int rc; ENTRY; @@ -426,27 +436,19 @@ static int qsd_reint_main(void *args) qsd->qsd_svname, PFID(&qqi->qqi_fid)); qqi_getref(qqi); - lu_ref_add(&qqi->qqi_reference, "reint_thread", thread); - - thread_set_flags(thread, SVC_RUNNING); - wake_up(&thread->t_ctl_waitq); - - OBD_ALLOC_PTR(env); - if (env == NULL) - GOTO(out, rc = -ENOMEM); - - /* initialize environment */ - rc = lu_env_init(env, LCT_DT_THREAD); - if (rc) - GOTO(out_env, rc); + lu_ref_add(&qqi->qqi_reference, "reint_thread", current); qti = qsd_info(env); + complete(args->qra_started); + /* wait for the connection to master established */ - wait_event_idle(thread->t_ctl_waitq, - qsd_connected(qsd) || !thread_is_running(thread)); + while (({set_current_state(TASK_IDLE); + !qsd_connected(qsd) && !kthread_should_stop(); })) + schedule(); + __set_current_state(TASK_RUNNING); /* Step 1: enqueue global index lock */ - if (!thread_is_running(thread)) + if (kthread_should_stop()) GOTO(out_env_init, rc = 0); LASSERT(qsd->qsd_exp != NULL); @@ -483,7 +485,7 @@ static int qsd_reint_main(void *args) } /* Step 2: reintegrate global index */ - if (!thread_is_running(thread)) + if (kthread_should_stop()) GOTO(out_lock, rc = 0); OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REINT, 10); @@ -500,7 +502,7 @@ static int qsd_reint_main(void *args) } /* Step 3: reintegrate slave index */ - if (!thread_is_running(thread)) + if (kthread_should_stop()) GOTO(out_lock, rc = 0); if (qqi->qqi_slv_ver != qti->qti_slv_ver) { @@ -515,10 +517,12 @@ static int qsd_reint_main(void *args) } /* wait for the qsd instance started (target recovery done) */ - wait_event_idle(thread->t_ctl_waitq, - qsd_started(qsd) || !thread_is_running(thread)); + while (({set_current_state(TASK_IDLE); + !qsd_started(qsd) && !kthread_should_stop(); })) + schedule(); + __set_current_state(TASK_RUNNING); - if (!thread_is_running(thread)) + if (kthread_should_stop()) GOTO(out_lock, rc = 0); /* Step 4: start reconciliation for each enforced ID */ @@ -532,17 +536,15 @@ out_lock: ldlm_lock_decref(&qqi->qqi_lockh, qsd_glb_einfo.ei_mode); out_env_init: lu_env_fini(env); -out_env: - OBD_FREE_PTR(env); -out: + OBD_FREE_PTR(args); write_lock(&qsd->qsd_lock); qqi->qqi_reint = 0; write_unlock(&qsd->qsd_lock); - thread_set_flags(thread, SVC_STOPPED); - wake_up(&thread->t_ctl_waitq); + if (xchg(&qqi->qqi_reint_task, NULL) == NULL) + wait_var_event(qqi, kthread_should_stop()); - lu_ref_del(&qqi->qqi_reference, "reint_thread", thread); + lu_ref_del(&qqi->qqi_reference, "reint_thread", current); qqi_putref(qqi); return rc; @@ -550,15 +552,11 @@ out: void qsd_stop_reint_thread(struct qsd_qtype_info *qqi) { - struct ptlrpc_thread *thread = &qqi->qqi_reint_thread; - - if (!thread_is_stopped(thread)) { - thread_set_flags(thread, SVC_STOPPING); - wake_up(&thread->t_ctl_waitq); + struct task_struct *task; - wait_event_idle(thread->t_ctl_waitq, - thread_is_stopped(thread)); - } + task = xchg(&qqi->qqi_reint_task, NULL); + if (task) + kthread_stop(task); } static int qsd_entry_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, @@ -629,11 +627,11 @@ out: int qsd_start_reint_thread(struct qsd_qtype_info *qqi) { - struct ptlrpc_thread *thread = &qqi->qqi_reint_thread; struct qsd_instance *qsd = qqi->qqi_qsd; struct task_struct *task; + struct qsd_reint_args *args; + DECLARE_COMPLETION_ONSTACK(started); int rc; - char *name; ENTRY; /* do not try to start a new thread as this can lead to a deadlock */ @@ -673,26 +671,35 @@ int qsd_start_reint_thread(struct qsd_qtype_info *qqi) RETURN(0); } - OBD_ALLOC(name, MTI_NAME_MAXLEN); - if (name == NULL) - RETURN(-ENOMEM); - snprintf(name, MTI_NAME_MAXLEN, "qsd_reint_%d.%s", - qqi->qqi_qtype, qsd->qsd_svname); + OBD_ALLOC_PTR(args); + if (args == NULL) + GOTO(out, rc = -ENOMEM); - task = kthread_run(qsd_reint_main, qqi, name); - OBD_FREE(name, MTI_NAME_MAXLEN); + args->qra_started = &started; + args->qra_qqi = qqi; + /* initialize environment */ + rc = lu_env_init(&args->qra_env, LCT_DT_THREAD); + if (rc) + GOTO(out_args, rc); + task = kthread_create(qsd_reint_main, args, "qsd_reint_%d.%s", + qqi->qqi_qtype, qsd->qsd_svname); if (IS_ERR(task)) { rc = PTR_ERR(task); - thread_set_flags(thread, SVC_STOPPED); + lu_env_fini(&args->qra_env); +out_args: + OBD_FREE_PTR(args); +out: write_lock(&qsd->qsd_lock); qqi->qqi_reint = 0; write_unlock(&qsd->qsd_lock); RETURN(rc); } - wait_event_idle(thread->t_ctl_waitq, - thread_is_running(thread) || thread_is_stopped(thread)); + qqi->qqi_reint_task = task; + wake_up_process(task); + wait_for_completion(&started); + RETURN(0); } -- 1.8.3.1