return 0;
}
-static inline int
-distribute_txn_commit_thread_running(struct lu_target *lut)
-{
- return lut->lut_tdtd_commit_thread.t_flags & SVC_RUNNING;
-}
-
-static inline int
-distribute_txn_commit_thread_stopped(struct lu_target *lut)
-{
- return lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED;
-}
-
/**
* Top thandle commit callback
*
top_multiple_thandle_dump(tmt, D_HA);
tmt->tmt_committed = 1;
lut = dt2lu_dev(tmt->tmt_master_sub_dt)->ld_site->ls_tgt;
- if (distribute_txn_commit_thread_running(lut))
- wake_up(&lut->lut_tdtd->tdtd_commit_thread_waitq);
+ if (lut->lut_tdtd && lut->lut_tdtd->tdtd_commit_task)
+ wake_up_process(lut->lut_tdtd->tdtd_commit_task);
+
RETURN_EXIT;
}
top_multiple_thandle_get(new);
top_multiple_thandle_dump(new, D_INFO);
- if (new->tmt_committed && at_head)
- wake_up(&tdtd->tdtd_commit_thread_waitq);
+ if (new->tmt_committed && at_head && tdtd->tdtd_commit_task)
+ wake_up_process(tdtd->tdtd_commit_task);
}
/**
RETURN(0);
}
-/**
- * Check if there are committed transaction
- *
- * Check if there are committed transaction in the distribute transaction
- * list, then cancel the update records for those committed transaction.
- * Because the distribute transaction in the list are sorted by batchid,
- * and cancellation will be done by batchid order, so we only check the first
- * the transaction(with lowest batchid) in the list.
- *
- * \param[in] lod lod device where cancel thread is
- *
- * \retval true if it is ready
- * \retval false if it is not ready
- */
-static bool tdtd_ready_for_cancel_log(struct target_distribute_txn_data *tdtd)
-{
- struct top_multiple_thandle *tmt = NULL;
- struct obd_device *obd = tdtd->tdtd_lut->lut_obd;
- bool ready = false;
-
- spin_lock(&tdtd->tdtd_batchid_lock);
- if (!list_empty(&tdtd->tdtd_list)) {
- tmt = list_entry(tdtd->tdtd_list.next,
- struct top_multiple_thandle, tmt_commit_list);
- if (tmt->tmt_committed &&
- (!obd->obd_recovering || (obd->obd_recovering &&
- tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)))
- ready = true;
- }
- spin_unlock(&tdtd->tdtd_batchid_lock);
-
- return ready;
-}
-
struct distribute_txn_bid_data {
struct dt_txn_commit_cb dtbd_cb;
struct target_distribute_txn_data *dtbd_tdtd;
!tdtd->tdtd_lut->lut_obd->obd_no_transno)
tdtd->tdtd_committed_batchid = dtbd->dtbd_batchid;
spin_unlock(&tdtd->tdtd_batchid_lock);
- atomic_dec(&tdtd->tdtd_refcount);
- wake_up(&tdtd->tdtd_commit_thread_waitq);
+ if (atomic_dec_and_test(&tdtd->tdtd_refcount))
+ wake_up_process(tdtd->tdtd_commit_task);
OBD_FREE_PTR(dtbd);
}
return rc;
}
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
/**
* manage the distribute transaction thread
*
static int distribute_txn_commit_thread(void *_arg)
{
struct target_distribute_txn_data *tdtd = _arg;
- struct lu_target *lut = tdtd->tdtd_lut;
- struct ptlrpc_thread *thread = &lut->lut_tdtd_commit_thread;
- struct lu_env env;
+ struct lu_env *env = &tdtd->tdtd_env;
LIST_HEAD(list);
int rc;
struct top_multiple_thandle *tmt;
ENTRY;
- rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
- if (rc != 0)
- RETURN(rc);
-
- spin_lock(&tdtd->tdtd_batchid_lock);
- thread->t_flags = SVC_RUNNING;
- spin_unlock(&tdtd->tdtd_batchid_lock);
- wake_up(&thread->t_ctl_waitq);
CDEBUG(D_HA, "%s: start commit thread committed batchid %llu\n",
tdtd->tdtd_lut->lut_obd->obd_name,
tdtd->tdtd_committed_batchid);
- while (distribute_txn_commit_thread_running(lut)) {
+ while (({set_current_state(TASK_IDLE);
+ !kthread_should_stop(); })) {
spin_lock(&tdtd->tdtd_batchid_lock);
list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
tmt_commit_list) {
* the recoverying is done, unless the update records
* batchid < committed_batchid. */
if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) {
+ __set_current_state(TASK_RUNNING);
list_move_tail(&tmt->tmt_commit_list, &list);
} else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) {
+ __set_current_state(TASK_RUNNING);
LASSERTF(tmt->tmt_batchid >= batchid,
"tmt %p tmt_batchid: %llu, batchid "
"%llu\n", tmt, tmt->tmt_batchid,
tdtd->tdtd_committed_batchid);
/* update globally committed on a storage */
if (batchid > tdtd->tdtd_committed_batchid) {
- rc = distribute_txn_commit_batchid_update(&env, tdtd,
+ __set_current_state(TASK_RUNNING);
+ rc = distribute_txn_commit_batchid_update(env, tdtd,
batchid);
if (rc == 0)
batchid = 0;
list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
if (tmt->tmt_batchid > committed)
break;
+ __set_current_state(TASK_RUNNING);
list_del_init(&tmt->tmt_commit_list);
if (tmt->tmt_result <= 0)
- distribute_txn_cancel_records(&env, tmt);
+ distribute_txn_cancel_records(env, tmt);
top_multiple_thandle_put(tmt);
}
- wait_event_idle(tdtd->tdtd_commit_thread_waitq,
- !distribute_txn_commit_thread_running(lut) ||
- committed < tdtd->tdtd_committed_batchid ||
- tdtd_ready_for_cancel_log(tdtd));
- };
+ if (current->state)
+ schedule();
+ }
- wait_event_idle(tdtd->tdtd_commit_thread_waitq,
- atomic_read(&tdtd->tdtd_refcount) == 0);
+ while (({set_current_state(TASK_IDLE);
+ atomic_read(&tdtd->tdtd_refcount) != 0; }))
+ schedule();
+ __set_current_state(TASK_RUNNING);
spin_lock(&tdtd->tdtd_batchid_lock);
list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
top_multiple_thandle_dump(tmt, D_HA);
top_multiple_thandle_put(tmt);
}
-
- thread->t_flags = SVC_STOPPED;
- lu_env_fini(&env);
- wake_up(&thread->t_ctl_waitq);
-
RETURN(0);
}
tdtd->tdtd_batchid = lut->lut_last_transno + 1;
- init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq);
- init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq);
init_waitqueue_head(&tdtd->tdtd_recovery_threads_waitq);
atomic_set(&tdtd->tdtd_refcount, 0);
atomic_set(&tdtd->tdtd_recovery_threads_count, 0);
if (rc != 0)
RETURN(rc);
- task = kthread_run(distribute_txn_commit_thread, tdtd, "dist_txn-%u",
- index);
- if (IS_ERR(task))
+ rc = lu_env_init(&tdtd->tdtd_env, LCT_LOCAL | LCT_MD_THREAD);
+ if (rc)
+ RETURN(rc);
+
+ task = kthread_create(distribute_txn_commit_thread, tdtd, "dist_txn-%u",
+ index);
+ if (IS_ERR(task)) {
+ lu_env_fini(&tdtd->tdtd_env);
RETURN(PTR_ERR(task));
+ }
+ tdtd->tdtd_commit_task = task;
+ wake_up_process(task);
- wait_event_idle(lut->lut_tdtd_commit_thread.t_ctl_waitq,
- distribute_txn_commit_thread_running(lut) ||
- distribute_txn_commit_thread_stopped(lut));
RETURN(0);
}
EXPORT_SYMBOL(distribute_txn_init);
void distribute_txn_fini(const struct lu_env *env,
struct target_distribute_txn_data *tdtd)
{
- struct lu_target *lut = tdtd->tdtd_lut;
+ struct top_multiple_thandle *tmt;
+ LIST_HEAD(list);
/* Stop cancel thread */
- if (lut == NULL || !distribute_txn_commit_thread_running(lut))
+ if (!tdtd->tdtd_commit_task)
return;
+ kthread_stop(tdtd->tdtd_commit_task);
+ tdtd->tdtd_commit_task = NULL;
+
spin_lock(&tdtd->tdtd_batchid_lock);
- lut->lut_tdtd_commit_thread.t_flags = SVC_STOPPING;
+ list_splice_init(&tdtd->tdtd_list, &list);
spin_unlock(&tdtd->tdtd_batchid_lock);
- wake_up(&tdtd->tdtd_commit_thread_waitq);
- wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
- lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED);
+
+ CDEBUG(D_INFO, "%s stopping distribute txn commit thread.\n",
+ tdtd->tdtd_lut->lut_obd->obd_name);
+ while ((tmt = list_first_entry_or_null(&list,
+ struct top_multiple_thandle,
+ tmt_commit_list)) != NULL) {
+ list_del_init(&tmt->tmt_commit_list);
+ top_multiple_thandle_dump(tmt, D_HA);
+ top_multiple_thandle_put(tmt);
+ }
+
+ lu_env_fini(&tdtd->tdtd_env);
dtrq_list_destroy(tdtd);
if (tdtd->tdtd_batchid_obj != NULL) {