Whamcloud - gitweb
LU-12780 target: don't use ptlrpc_thread for txn_commit_thread
[fs/lustre-release.git] / lustre / target / update_trans.c
index 4d9a4c0..863d52f 100644 (file)
@@ -387,18 +387,6 @@ static int prepare_writing_updates(const struct lu_env *env,
        return 0;
 }
 
-static inline int
-distribute_txn_commit_thread_running(struct lu_target *lut)
-{
-       return lut->lut_tdtd_commit_thread.t_flags & SVC_RUNNING;
-}
-
-static inline int
-distribute_txn_commit_thread_stopped(struct lu_target *lut)
-{
-       return lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED;
-}
-
 /**
  * Top thandle commit callback
  *
@@ -416,8 +404,9 @@ static void top_trans_committed_cb(struct top_multiple_thandle *tmt)
        top_multiple_thandle_dump(tmt, D_HA);
        tmt->tmt_committed = 1;
        lut = dt2lu_dev(tmt->tmt_master_sub_dt)->ld_site->ls_tgt;
-       if (distribute_txn_commit_thread_running(lut))
-               wake_up(&lut->lut_tdtd->tdtd_commit_thread_waitq);
+       if (lut->lut_tdtd && lut->lut_tdtd->tdtd_commit_task)
+               wake_up_process(lut->lut_tdtd->tdtd_commit_task);
+
        RETURN_EXIT;
 }
 
@@ -723,8 +712,8 @@ void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new)
 
        top_multiple_thandle_get(new);
        top_multiple_thandle_dump(new, D_INFO);
-       if (new->tmt_committed && at_head)
-               wake_up(&tdtd->tdtd_commit_thread_waitq);
+       if (new->tmt_committed && at_head && tdtd->tdtd_commit_task)
+               wake_up_process(tdtd->tdtd_commit_task);
 }
 
 /**
@@ -1316,40 +1305,6 @@ static int distribute_txn_cancel_records(const struct lu_env *env,
        RETURN(0);
 }
 
-/**
- * Check if there are committed transaction
- *
- * Check if there are committed transaction in the distribute transaction
- * list, then cancel the update records for those committed transaction.
- * Because the distribute transaction in the list are sorted by batchid,
- * and cancellation will be done by batchid order, so we only check the first
- * the transaction(with lowest batchid) in the list.
- *
- * \param[in] lod      lod device where cancel thread is
- *
- * \retval             true if it is ready
- * \retval             false if it is not ready
- */
-static bool tdtd_ready_for_cancel_log(struct target_distribute_txn_data *tdtd)
-{
-       struct top_multiple_thandle     *tmt = NULL;
-       struct obd_device               *obd = tdtd->tdtd_lut->lut_obd;
-       bool    ready = false;
-
-       spin_lock(&tdtd->tdtd_batchid_lock);
-       if (!list_empty(&tdtd->tdtd_list)) {
-               tmt = list_entry(tdtd->tdtd_list.next,
-                                struct top_multiple_thandle, tmt_commit_list);
-               if (tmt->tmt_committed &&
-                   (!obd->obd_recovering || (obd->obd_recovering &&
-                   tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)))
-                       ready = true;
-       }
-       spin_unlock(&tdtd->tdtd_batchid_lock);
-
-       return ready;
-}
-
 struct distribute_txn_bid_data {
        struct dt_txn_commit_cb  dtbd_cb;
        struct target_distribute_txn_data      *dtbd_tdtd;
@@ -1385,8 +1340,8 @@ static void distribute_txn_batchid_cb(struct lu_env *env,
            !tdtd->tdtd_lut->lut_obd->obd_no_transno)
                tdtd->tdtd_committed_batchid = dtbd->dtbd_batchid;
        spin_unlock(&tdtd->tdtd_batchid_lock);
-       atomic_dec(&tdtd->tdtd_refcount);
-       wake_up(&tdtd->tdtd_commit_thread_waitq);
+       if (atomic_dec_and_test(&tdtd->tdtd_refcount))
+               wake_up_process(tdtd->tdtd_commit_task);
 
        OBD_FREE_PTR(dtbd);
 }
@@ -1535,6 +1490,10 @@ out_put:
        return rc;
 }
 
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
 /**
  * manage the distribute transaction thread
  *
@@ -1550,9 +1509,7 @@ out_put:
 static int distribute_txn_commit_thread(void *_arg)
 {
        struct target_distribute_txn_data *tdtd = _arg;
-       struct lu_target        *lut = tdtd->tdtd_lut;
-       struct ptlrpc_thread    *thread = &lut->lut_tdtd_commit_thread;
-       struct lu_env            env;
+       struct lu_env           *env = &tdtd->tdtd_env;
        LIST_HEAD(list);
        int                      rc;
        struct top_multiple_thandle *tmt;
@@ -1561,20 +1518,13 @@ static int distribute_txn_commit_thread(void *_arg)
 
        ENTRY;
 
-       rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
-       if (rc != 0)
-               RETURN(rc);
-
-       spin_lock(&tdtd->tdtd_batchid_lock);
-       thread->t_flags = SVC_RUNNING;
-       spin_unlock(&tdtd->tdtd_batchid_lock);
-       wake_up(&thread->t_ctl_waitq);
 
        CDEBUG(D_HA, "%s: start commit thread committed batchid %llu\n",
               tdtd->tdtd_lut->lut_obd->obd_name,
               tdtd->tdtd_committed_batchid);
 
-       while (distribute_txn_commit_thread_running(lut)) {
+       while (({set_current_state(TASK_IDLE);
+                !kthread_should_stop(); })) {
                spin_lock(&tdtd->tdtd_batchid_lock);
                list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
                                         tmt_commit_list) {
@@ -1587,8 +1537,10 @@ static int distribute_txn_commit_thread(void *_arg)
                         * the recoverying is done, unless the update records
                         * batchid < committed_batchid. */
                        if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) {
+                               __set_current_state(TASK_RUNNING);
                                list_move_tail(&tmt->tmt_commit_list, &list);
                        } else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) {
+                               __set_current_state(TASK_RUNNING);
                                LASSERTF(tmt->tmt_batchid >= batchid,
                                         "tmt %p tmt_batchid: %llu, batchid "
                                          "%llu\n", tmt, tmt->tmt_batchid,
@@ -1624,7 +1576,8 @@ static int distribute_txn_commit_thread(void *_arg)
                       tdtd->tdtd_committed_batchid);
                /* update globally committed on a storage */
                if (batchid > tdtd->tdtd_committed_batchid) {
-                       rc = distribute_txn_commit_batchid_update(&env, tdtd,
+                       __set_current_state(TASK_RUNNING);
+                       rc = distribute_txn_commit_batchid_update(env, tdtd,
                                                             batchid);
                        if (rc == 0)
                                batchid = 0;
@@ -1635,20 +1588,21 @@ static int distribute_txn_commit_thread(void *_arg)
                list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
                        if (tmt->tmt_batchid > committed)
                                break;
+                       __set_current_state(TASK_RUNNING);
                        list_del_init(&tmt->tmt_commit_list);
                        if (tmt->tmt_result <= 0)
-                               distribute_txn_cancel_records(&env, tmt);
+                               distribute_txn_cancel_records(env, tmt);
                        top_multiple_thandle_put(tmt);
                }
 
-               wait_event_idle(tdtd->tdtd_commit_thread_waitq,
-                               !distribute_txn_commit_thread_running(lut) ||
-                               committed < tdtd->tdtd_committed_batchid ||
-                               tdtd_ready_for_cancel_log(tdtd));
-       };
+               if (current->state)
+                       schedule();
+       }
 
-       wait_event_idle(tdtd->tdtd_commit_thread_waitq,
-                       atomic_read(&tdtd->tdtd_refcount) == 0);
+       while (({set_current_state(TASK_IDLE);
+                atomic_read(&tdtd->tdtd_refcount) != 0; }))
+               schedule();
+       __set_current_state(TASK_RUNNING);
 
        spin_lock(&tdtd->tdtd_batchid_lock);
        list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
@@ -1663,11 +1617,6 @@ static int distribute_txn_commit_thread(void *_arg)
                top_multiple_thandle_dump(tmt, D_HA);
                top_multiple_thandle_put(tmt);
        }
-
-       thread->t_flags = SVC_STOPPED;
-       lu_env_fini(&env);
-       wake_up(&thread->t_ctl_waitq);
-
        RETURN(0);
 }
 
@@ -1701,8 +1650,6 @@ int distribute_txn_init(const struct lu_env *env,
 
        tdtd->tdtd_batchid = lut->lut_last_transno + 1;
 
-       init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq);
-       init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq);
        init_waitqueue_head(&tdtd->tdtd_recovery_threads_waitq);
        atomic_set(&tdtd->tdtd_refcount, 0);
        atomic_set(&tdtd->tdtd_recovery_threads_count, 0);
@@ -1715,14 +1662,19 @@ int distribute_txn_init(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
-       task = kthread_run(distribute_txn_commit_thread, tdtd, "dist_txn-%u",
-                          index);
-       if (IS_ERR(task))
+       rc = lu_env_init(&tdtd->tdtd_env, LCT_LOCAL | LCT_MD_THREAD);
+       if (rc)
+               RETURN(rc);
+
+       task = kthread_create(distribute_txn_commit_thread, tdtd, "dist_txn-%u",
+                             index);
+       if (IS_ERR(task)) {
+               lu_env_fini(&tdtd->tdtd_env);
                RETURN(PTR_ERR(task));
+       }
+       tdtd->tdtd_commit_task = task;
+       wake_up_process(task);
 
-       wait_event_idle(lut->lut_tdtd_commit_thread.t_ctl_waitq,
-                       distribute_txn_commit_thread_running(lut) ||
-                       distribute_txn_commit_thread_stopped(lut));
        RETURN(0);
 }
 EXPORT_SYMBOL(distribute_txn_init);
@@ -1738,18 +1690,31 @@ EXPORT_SYMBOL(distribute_txn_init);
 void distribute_txn_fini(const struct lu_env *env,
                         struct target_distribute_txn_data *tdtd)
 {
-       struct lu_target *lut = tdtd->tdtd_lut;
+       struct top_multiple_thandle *tmt;
+       LIST_HEAD(list);
 
        /* Stop cancel thread */
-       if (lut == NULL || !distribute_txn_commit_thread_running(lut))
+       if (!tdtd->tdtd_commit_task)
                return;
 
+       kthread_stop(tdtd->tdtd_commit_task);
+       tdtd->tdtd_commit_task = NULL;
+
        spin_lock(&tdtd->tdtd_batchid_lock);
-       lut->lut_tdtd_commit_thread.t_flags = SVC_STOPPING;
+       list_splice_init(&tdtd->tdtd_list, &list);
        spin_unlock(&tdtd->tdtd_batchid_lock);
-       wake_up(&tdtd->tdtd_commit_thread_waitq);
-       wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
-                  lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED);
+
+       CDEBUG(D_INFO, "%s stopping distribute txn commit thread.\n",
+              tdtd->tdtd_lut->lut_obd->obd_name);
+       while ((tmt = list_first_entry_or_null(&list,
+                                              struct top_multiple_thandle,
+                                              tmt_commit_list)) != NULL) {
+               list_del_init(&tmt->tmt_commit_list);
+               top_multiple_thandle_dump(tmt, D_HA);
+               top_multiple_thandle_put(tmt);
+       }
+
+       lu_env_fini(&tdtd->tdtd_env);
 
        dtrq_list_destroy(tdtd);
        if (tdtd->tdtd_batchid_obj != NULL) {