if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
added = true;
+ if (qmt->qmt_reba_task)
+ wake_up_process(qmt->qmt_reba_task);
}
spin_unlock(&qmt->qmt_reba_lock);
- if (added)
- wake_up(&qmt->qmt_reba_thread.t_ctl_waitq);
- else
+ if (!added)
lqe_putref(lqe);
EXIT;
}
+struct qmt_reba_args {
+ struct qmt_device *qra_dev;
+ struct lu_env qra_env;
+ struct completion *qra_started;
+};
+
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
/*
* The rebalance thread is in charge of sending glimpse callbacks on per-ID
* quota locks owned by slaves in order to notify them of:
* try to acquire quota from the master since this latter has already
* distributed all the space.
*/
-static int qmt_reba_thread(void *arg)
+static int qmt_reba_thread(void *_args)
{
- struct qmt_device *qmt = (struct qmt_device *)arg;
- struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
- struct lu_env *env;
+ struct qmt_reba_args *args = _args;
+ struct qmt_device *qmt = args->qra_dev;
+ struct lu_env *env = &args->qra_env;
struct lquota_entry *lqe, *tmp;
- int rc;
ENTRY;
- OBD_ALLOC_PTR(env);
- if (env == NULL) {
- thread_set_flags(thread, SVC_STOPPED);
- RETURN(-ENOMEM);
- }
-
- rc = lu_env_init(env, LCT_MD_THREAD);
- if (rc) {
- CERROR("%s: failed to init env.", qmt->qmt_svname);
- GOTO(out_env, rc);
- }
- rc = lu_env_add(env);
- if (rc)
- GOTO(out_env_fini, rc);
-
- thread_set_flags(thread, SVC_RUNNING);
- wake_up(&thread->t_ctl_waitq);
-
- while (1) {
- wait_event_idle(thread->t_ctl_waitq,
- !list_empty(&qmt->qmt_reba_list) ||
- !thread_is_running(thread));
+ complete(args->qra_started);
+ while (({set_current_state(TASK_IDLE);
+ !kthread_should_stop(); })) {
spin_lock(&qmt->qmt_reba_lock);
list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
lqe_link) {
+ __set_current_state(TASK_RUNNING);
list_del_init(&lqe->lqe_link);
spin_unlock(&qmt->qmt_reba_lock);
- if (thread_is_running(thread))
+ if (!kthread_should_stop())
qmt_id_lock_glimpse(env, qmt, lqe, NULL);
lqe_putref(lqe);
spin_lock(&qmt->qmt_reba_lock);
}
spin_unlock(&qmt->qmt_reba_lock);
-
- if (!thread_is_running(thread))
- break;
+ schedule();
}
+ __set_current_state(TASK_RUNNING);
+
lu_env_remove(env);
-out_env_fini:
lu_env_fini(env);
-out_env:
- OBD_FREE_PTR(env);
- thread_set_flags(thread, SVC_STOPPED);
- wake_up(&thread->t_ctl_waitq);
- RETURN(rc);
+ OBD_FREE_PTR(args);
+ RETURN(0);
}
/*
*/
int qmt_start_reba_thread(struct qmt_device *qmt)
{
- struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
- struct task_struct *task;
+ struct task_struct *task;
+ struct qmt_reba_args *args;
+ DECLARE_COMPLETION_ONSTACK(started);
+ int rc;
ENTRY;
- task = kthread_run(qmt_reba_thread, (void *)qmt,
- "qmt_reba_%s", qmt->qmt_svname);
+ OBD_ALLOC_PTR(args);
+ if (args == NULL)
+ RETURN(-ENOMEM);
+ args->qra_dev = qmt;
+ args->qra_started = &started;
+
+ rc = lu_env_init(&args->qra_env, LCT_MD_THREAD);
+ if (rc) {
+ CERROR("%s: failed to init env.", qmt->qmt_svname);
+ GOTO(out_env, rc);
+ }
+
+ task = kthread_create(qmt_reba_thread, args,
+ "qmt_reba_%s", qmt->qmt_svname);
if (IS_ERR(task)) {
CERROR("%s: failed to start rebalance thread (%ld)\n",
qmt->qmt_svname, PTR_ERR(task));
- thread_set_flags(thread, SVC_STOPPED);
- RETURN(PTR_ERR(task));
+ GOTO(out_env_fini, rc = PTR_ERR(task));
}
- wait_event_idle(thread->t_ctl_waitq,
- thread_is_running(thread) || thread_is_stopped(thread));
+ rc = lu_env_add_task(&args->qra_env, task);
+ if (rc) {
+ kthread_stop(task);
+ GOTO(out_env_fini, rc);
+ }
+ qmt->qmt_reba_task = task;
+ wake_up_process(task);
+ wait_for_completion(&started);
RETURN(0);
+out_env_fini:
+ lu_env_fini(&args->qra_env);
+out_env:
+ OBD_FREE_PTR(args);
+ RETURN(rc);
}
/*
*/
void qmt_stop_reba_thread(struct qmt_device *qmt)
{
- struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
+ struct task_struct *task;
- if (!thread_is_stopped(thread)) {
+ spin_lock(&qmt->qmt_reba_lock);
+ task = qmt->qmt_reba_task;
+ qmt->qmt_reba_task = NULL;
+ spin_unlock(&qmt->qmt_reba_lock);
- thread_set_flags(thread, SVC_STOPPING);
- wake_up(&thread->t_ctl_waitq);
+ if (task)
+ kthread_stop(task);
- wait_event_idle(thread->t_ctl_waitq, thread_is_stopped(thread));
- }
LASSERT(list_empty(&qmt->qmt_reba_list));
}