static int osp_sync_add_commit_cb(const struct lu_env *env,
struct osp_device *d, struct thandle *th);
-static inline int osp_sync_running(struct osp_device *d)
-{
- return !!(d->opd_sync_thread.t_flags & SVC_RUNNING);
-}
-
-/**
- * Check status: whether OSP thread has stopped
- *
- * \param[in] d OSP device
- *
- * \retval 0 still running
- * \retval 1 stopped
- */
-static inline int osp_sync_stopped(struct osp_device *d)
-{
- return !!(d->opd_sync_thread.t_flags & SVC_STOPPED);
-}
-
/*
** Check for new changes to sync
*
osp_sync_check_for_work(d);
/* wake up the thread if requested to stop:
- * it might be waiting for in-progress to complete */
- if (unlikely(osp_sync_running(d) == 0))
+ * it might be waiting for in-progress to complete
+ */
+ if (atomic_read(&d->opd_sync_rpcs_in_progress) == 0)
wake_up(&d->opd_sync_waitq);
EXIT;
struct osp_device *d = data;
do {
- if (!osp_sync_running(d)) {
+ if (!d->opd_sync_task) {
CDEBUG(D_HA, "stop llog processing\n");
return LLOG_PROC_BREAK;
}
msleep(1 * MSEC_PER_SEC);
wait_event_idle(d->opd_sync_waitq,
- !osp_sync_running(d) ||
+ !d->opd_sync_task ||
osp_sync_can_process_new(d, rec) ||
!list_empty(&d->opd_sync_committed_there));
} while (1);
}
+struct osp_sync_args {
+ struct osp_device *osa_dev;
+ struct lu_env osa_env;
+ struct completion *osa_started;
+};
+
/**
* OSP sync thread.
*
* \retval 0 on success
* \retval negative negated errno on error
*/
-static int osp_sync_thread(void *_arg)
+static int osp_sync_thread(void *_args)
{
- struct osp_device *d = _arg;
- struct ptlrpc_thread *thread = &d->opd_sync_thread;
+ struct osp_sync_args *args = _args;
+ struct osp_device *d = args->osa_dev;
struct llog_ctxt *ctxt;
struct obd_device *obd = d->opd_obd;
struct llog_handle *llh;
- struct lu_env env;
+ struct lu_env *env = &args->osa_env;
int rc, count;
bool wrapped;
ENTRY;
- rc = lu_env_init(&env, LCT_LOCAL);
- if (rc) {
- CERROR("%s: can't initialize env: rc = %d\n",
- obd->obd_name, rc);
-
- spin_lock(&d->opd_sync_lock);
- thread->t_flags = SVC_STOPPED;
- spin_unlock(&d->opd_sync_lock);
- wake_up(&thread->t_ctl_waitq);
-
- RETURN(rc);
- }
-
- spin_lock(&d->opd_sync_lock);
- thread->t_flags = SVC_RUNNING;
- spin_unlock(&d->opd_sync_lock);
- wake_up(&thread->t_ctl_waitq);
-
+ complete(args->osa_started);
again:
ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
if (ctxt == NULL) {
rc = -EINPROGRESS;
goto next;
}
- rc = llog_cat_process(&env, llh, osp_sync_process_queues, d,
+ rc = llog_cat_process(env, llh, osp_sync_process_queues, d,
d->opd_sync_last_catalog_idx, 0);
next:
if (rc == -EINPROGRESS) {
/* can't access the llog now - OI scrub is trying to fix
* underlying issue. let's wait and try again */
- llog_cat_close(&env, llh);
- rc = llog_cleanup(&env, ctxt);
+ llog_cat_close(env, llh);
+ rc = llog_cleanup(env, ctxt);
if (rc)
GOTO(out, rc);
schedule_timeout_interruptible(cfs_time_seconds(5));
atomic_read(&d->opd_sync_rpcs_in_flight), rc);
/* we don't expect llog_process_thread() to exit till umount */
- LASSERTF(thread->t_flags != SVC_RUNNING,
+ LASSERTF(kthread_should_stop(),
"%u changes, %u in progress, %u in flight\n",
atomic_read(&d->opd_sync_changes),
atomic_read(&d->opd_sync_rpcs_in_progress),
/* wait till all the requests are completed */
count = 0;
while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) {
- osp_sync_process_committed(&env, d);
+ osp_sync_process_committed(env, d);
rc = wait_event_idle_timeout(
d->opd_sync_waitq,
}
close:
- llog_cat_close(&env, llh);
- rc = llog_cleanup(&env, ctxt);
+ llog_cat_close(env, llh);
+ rc = llog_cleanup(env, ctxt);
if (rc)
CERROR("can't cleanup llog: %d\n", rc);
out:
atomic_read(&d->opd_sync_rpcs_in_flight),
list_empty(&d->opd_sync_committed_there) ? "" : "!");
- thread->t_flags = SVC_STOPPED;
-
- wake_up(&thread->t_ctl_waitq);
+ lu_env_fini(env);
- lu_env_fini(&env);
+ if (xchg(&d->opd_sync_task, NULL) == NULL)
+ /* already being waited for */
+ wait_event_interruptible(d->opd_sync_waitq,
+ kthread_should_stop());
+ OBD_FREE_PTR(args);
RETURN(0);
}
int osp_sync_init(const struct lu_env *env, struct osp_device *d)
{
struct task_struct *task;
+ struct osp_sync_args *args;
+ DECLARE_COMPLETION_ONSTACK(started);
int rc;
ENTRY;
spin_lock_init(&d->opd_sync_lock);
init_waitqueue_head(&d->opd_sync_waitq);
init_waitqueue_head(&d->opd_sync_barrier_waitq);
- thread_set_flags(&d->opd_sync_thread, SVC_INIT);
- init_waitqueue_head(&d->opd_sync_thread.t_ctl_waitq);
INIT_LIST_HEAD(&d->opd_sync_in_flight_list);
INIT_LIST_HEAD(&d->opd_sync_committed_there);
if (d->opd_storage->dd_rdonly)
RETURN(0);
+ OBD_ALLOC_PTR(args);
+ if (!args)
+ GOTO(err_id, rc = -ENOMEM);
+ args->osa_dev = d;
+ args->osa_started = &started;
/*
* initialize llog storing changes
*/
GOTO(err_id, rc);
}
+ rc = lu_env_init(&args->osa_env, LCT_LOCAL);
+ if (rc) {
+ CERROR("%s: can't initialize env: rc = %d\n",
+ d->opd_obd->obd_name, rc);
+ GOTO(err_llog, rc);
+ }
+
/*
* Start synchronization thread
*/
- task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
+ task = kthread_create(osp_sync_thread, args, "osp-syn-%u-%u",
d->opd_index, d->opd_group);
if (IS_ERR(task)) {
rc = PTR_ERR(task);
CERROR("%s: cannot start sync thread: rc = %d\n",
d->opd_obd->obd_name, rc);
+ lu_env_fini(&args->osa_env);
GOTO(err_llog, rc);
}
-
- wait_event_idle(d->opd_sync_thread.t_ctl_waitq,
- osp_sync_running(d) || osp_sync_stopped(d));
+ d->opd_sync_task = task;
+ wake_up_process(task);
+ wait_for_completion(&started);
RETURN(0);
err_llog:
osp_sync_llog_fini(env, d);
err_id:
+ if (args)
+ OBD_FREE_PTR(args);
return rc;
}
*/
int osp_sync_fini(struct osp_device *d)
{
- struct ptlrpc_thread *thread = &d->opd_sync_thread;
+ struct task_struct *task;
ENTRY;
- if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
- thread->t_flags = SVC_STOPPING;
- wake_up(&d->opd_sync_waitq);
- wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
- }
+ task = xchg(&d->opd_sync_task, NULL);
+ if (task)
+ kthread_stop(task);
RETURN(0);
}