From 8afc9590a78f6b20c338c027e17043bb22012817 Mon Sep 17 00:00:00 2001 From: Mr NeilBrown Date: Wed, 23 Oct 2019 11:30:50 +1100 Subject: [PATCH] LU-12780 osp: don't use ptlrpc_thread for osp_sync_thread() Instead of ptlrpc_thread, just use native kthread functionality. - Move allocations to before the thread is created, so error reporting is easier. - add a completion so that we can be sure that the thread is stared, and hence know that cleanup will eventually happen. - use kthread_stop() to tell the thread to stop, and kthread_should_stop() to see if it has bene told. - clear ->opd_sync_task before calling kthread_stop(), and use a test for NULL to see if the thread is stopping - As some error conditions can cause the thread to exit before being asked, and as kthread_stop() doesn't handle this possibility, add an extra handshake. Both thread-exit and osp_sync_fini() atomically exchange NULL for the thread pointer. If thread-exit is first, the exit proceeds and osp_sync_fini() never waits. For osp_sync_fini() is first, it calls kthread_exit() to wait, and thread-exit doesn't happen until it is sure that kthread_stop() is waiting. Signed-off-by: Mr NeilBrown Change-Id: I952cbdae90252476853854d89b8f1a78d95ddbde Reviewed-on: https://review.whamcloud.com/36265 Tested-by: jenkins Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- lustre/osp/osp_internal.h | 2 +- lustre/osp/osp_sync.c | 121 +++++++++++++++++++++------------------------- 2 files changed, 55 insertions(+), 68 deletions(-) diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 02c153d..348c0d7 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -217,7 +217,7 @@ struct osp_device { /* processing of changes from previous mount is done? */ int opd_sync_prev_done; /* found records */ - struct ptlrpc_thread opd_sync_thread; + struct task_struct *opd_sync_task; wait_queue_head_t opd_sync_waitq; /* list of in flight rpcs */ struct list_head opd_sync_in_flight_list; diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index 4df823e..9ea2346 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -99,24 +99,6 @@ struct osp_job_req_args { static int osp_sync_add_commit_cb(const struct lu_env *env, struct osp_device *d, struct thandle *th); -static inline int osp_sync_running(struct osp_device *d) -{ - return !!(d->opd_sync_thread.t_flags & SVC_RUNNING); -} - -/** - * Check status: whether OSP thread has stopped - * - * \param[in] d OSP device - * - * \retval 0 still running - * \retval 1 stopped - */ -static inline int osp_sync_stopped(struct osp_device *d) -{ - return !!(d->opd_sync_thread.t_flags & SVC_STOPPED); -} - /* ** Check for new changes to sync * @@ -1126,8 +1108,9 @@ static void osp_sync_process_committed(const struct lu_env *env, osp_sync_check_for_work(d); /* wake up the thread if requested to stop: - * it might be waiting for in-progress to complete */ - if (unlikely(osp_sync_running(d) == 0)) + * it might be waiting for in-progress to complete + */ + if (atomic_read(&d->opd_sync_rpcs_in_progress) == 0) wake_up(&d->opd_sync_waitq); EXIT; @@ -1157,7 +1140,7 @@ static int osp_sync_process_queues(const struct lu_env *env, struct osp_device *d = data; do { - if (!osp_sync_running(d)) { + if (!d->opd_sync_task) { CDEBUG(D_HA, "stop llog processing\n"); return LLOG_PROC_BREAK; } @@ -1181,12 +1164,18 @@ static int osp_sync_process_queues(const struct lu_env *env, msleep(1 * MSEC_PER_SEC); wait_event_idle(d->opd_sync_waitq, - !osp_sync_running(d) || + !d->opd_sync_task || osp_sync_can_process_new(d, rec) || !list_empty(&d->opd_sync_committed_there)); } while (1); } +struct osp_sync_args { + struct osp_device *osa_dev; + struct lu_env osa_env; + struct completion *osa_started; +}; + /** * OSP sync thread. * @@ -1207,37 +1196,20 @@ static int osp_sync_process_queues(const struct lu_env *env, * \retval 0 on success * \retval negative negated errno on error */ -static int osp_sync_thread(void *_arg) +static int osp_sync_thread(void *_args) { - struct osp_device *d = _arg; - struct ptlrpc_thread *thread = &d->opd_sync_thread; + struct osp_sync_args *args = _args; + struct osp_device *d = args->osa_dev; struct llog_ctxt *ctxt; struct obd_device *obd = d->opd_obd; struct llog_handle *llh; - struct lu_env env; + struct lu_env *env = &args->osa_env; int rc, count; bool wrapped; ENTRY; - rc = lu_env_init(&env, LCT_LOCAL); - if (rc) { - CERROR("%s: can't initialize env: rc = %d\n", - obd->obd_name, rc); - - spin_lock(&d->opd_sync_lock); - thread->t_flags = SVC_STOPPED; - spin_unlock(&d->opd_sync_lock); - wake_up(&thread->t_ctl_waitq); - - RETURN(rc); - } - - spin_lock(&d->opd_sync_lock); - thread->t_flags = SVC_RUNNING; - spin_unlock(&d->opd_sync_lock); - wake_up(&thread->t_ctl_waitq); - + complete(args->osa_started); again: ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); if (ctxt == NULL) { @@ -1269,7 +1241,7 @@ again: rc = -EINPROGRESS; goto next; } - rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, + rc = llog_cat_process(env, llh, osp_sync_process_queues, d, d->opd_sync_last_catalog_idx, 0); next: @@ -1290,8 +1262,8 @@ next: if (rc == -EINPROGRESS) { /* can't access the llog now - OI scrub is trying to fix * underlying issue. let's wait and try again */ - llog_cat_close(&env, llh); - rc = llog_cleanup(&env, ctxt); + llog_cat_close(env, llh); + rc = llog_cleanup(env, ctxt); if (rc) GOTO(out, rc); schedule_timeout_interruptible(cfs_time_seconds(5)); @@ -1309,7 +1281,7 @@ next: atomic_read(&d->opd_sync_rpcs_in_flight), rc); /* we don't expect llog_process_thread() to exit till umount */ - LASSERTF(thread->t_flags != SVC_RUNNING, + LASSERTF(kthread_should_stop(), "%u changes, %u in progress, %u in flight\n", atomic_read(&d->opd_sync_changes), atomic_read(&d->opd_sync_rpcs_in_progress), @@ -1318,7 +1290,7 @@ next: /* wait till all the requests are completed */ count = 0; while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) { - osp_sync_process_committed(&env, d); + osp_sync_process_committed(env, d); rc = wait_event_idle_timeout( d->opd_sync_waitq, @@ -1335,8 +1307,8 @@ next: } close: - llog_cat_close(&env, llh); - rc = llog_cleanup(&env, ctxt); + llog_cat_close(env, llh); + rc = llog_cleanup(env, ctxt); if (rc) CERROR("can't cleanup llog: %d\n", rc); out: @@ -1346,11 +1318,13 @@ out: atomic_read(&d->opd_sync_rpcs_in_flight), list_empty(&d->opd_sync_committed_there) ? "" : "!"); - thread->t_flags = SVC_STOPPED; - - wake_up(&thread->t_ctl_waitq); + lu_env_fini(env); - lu_env_fini(&env); + if (xchg(&d->opd_sync_task, NULL) == NULL) + /* already being waited for */ + wait_event_interruptible(d->opd_sync_waitq, + kthread_should_stop()); + OBD_FREE_PTR(args); RETURN(0); } @@ -1527,6 +1501,8 @@ static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d) int osp_sync_init(const struct lu_env *env, struct osp_device *d) { struct task_struct *task; + struct osp_sync_args *args; + DECLARE_COMPLETION_ONSTACK(started); int rc; ENTRY; @@ -1536,14 +1512,17 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d) spin_lock_init(&d->opd_sync_lock); init_waitqueue_head(&d->opd_sync_waitq); init_waitqueue_head(&d->opd_sync_barrier_waitq); - thread_set_flags(&d->opd_sync_thread, SVC_INIT); - init_waitqueue_head(&d->opd_sync_thread.t_ctl_waitq); INIT_LIST_HEAD(&d->opd_sync_in_flight_list); INIT_LIST_HEAD(&d->opd_sync_committed_there); if (d->opd_storage->dd_rdonly) RETURN(0); + OBD_ALLOC_PTR(args); + if (!args) + GOTO(err_id, rc = -ENOMEM); + args->osa_dev = d; + args->osa_started = &started; /* * initialize llog storing changes */ @@ -1554,25 +1533,35 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d) GOTO(err_id, rc); } + rc = lu_env_init(&args->osa_env, LCT_LOCAL); + if (rc) { + CERROR("%s: can't initialize env: rc = %d\n", + d->opd_obd->obd_name, rc); + GOTO(err_llog, rc); + } + /* * Start synchronization thread */ - task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u", + task = kthread_create(osp_sync_thread, args, "osp-syn-%u-%u", d->opd_index, d->opd_group); if (IS_ERR(task)) { rc = PTR_ERR(task); CERROR("%s: cannot start sync thread: rc = %d\n", d->opd_obd->obd_name, rc); + lu_env_fini(&args->osa_env); GOTO(err_llog, rc); } - - wait_event_idle(d->opd_sync_thread.t_ctl_waitq, - osp_sync_running(d) || osp_sync_stopped(d)); + d->opd_sync_task = task; + wake_up_process(task); + wait_for_completion(&started); RETURN(0); err_llog: osp_sync_llog_fini(env, d); err_id: + if (args) + OBD_FREE_PTR(args); return rc; } @@ -1587,15 +1576,13 @@ err_id: */ int osp_sync_fini(struct osp_device *d) { - struct ptlrpc_thread *thread = &d->opd_sync_thread; + struct task_struct *task; ENTRY; - if (!thread_is_init(thread) && !thread_is_stopped(thread)) { - thread->t_flags = SVC_STOPPING; - wake_up(&d->opd_sync_waitq); - wait_event(thread->t_ctl_waitq, thread_is_stopped(thread)); - } + task = xchg(&d->opd_sync_task, NULL); + if (task) + kthread_stop(task); RETURN(0); } -- 1.8.3.1