+ OBD_FREE_PTR(tmt);
+}
+EXPORT_SYMBOL(top_multiple_thandle_destroy);
+
+/**
+ * Cancel the update log on MDTs
+ *
+ * Cancel the update log on MDTs then destroy the thandle.
+ *
+ * \param[in] env execution environment
+ * \param[in] tmt the top multiple thandle whose updates records
+ * will be cancelled.
+ *
+ * \retval 0 if cancellation succeeds.
+ * \retval negative errno if cancellation fails.
+ */
+static int distribute_txn_cancel_records(const struct lu_env *env,
+ struct top_multiple_thandle *tmt)
+{
+ struct sub_thandle *st;
+ ENTRY;
+
+ top_multiple_thandle_dump(tmt, D_INFO);
+ /* Cancel update logs on other MDTs */
+ list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+ struct llog_ctxt *ctxt;
+ struct obd_device *obd;
+ struct llog_cookie *cookie;
+ struct sub_thandle_cookie *stc;
+ int rc;
+
+ obd = st->st_dt->dd_lu_dev.ld_obd;
+ ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
+ if (ctxt == NULL)
+ continue;
+ list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
+ cookie = &stc->stc_cookie;
+ if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid))
+ continue;
+
+ rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1,
+ cookie);
+ CDEBUG(D_HA, "%s: batchid %llu cancel update log "
+ DFID".%u: rc = %d\n", obd->obd_name,
+ tmt->tmt_batchid,
+ PFID(&cookie->lgc_lgl.lgl_oi.oi_fid),
+ cookie->lgc_index, rc);
+ }
+
+ llog_ctxt_put(ctxt);
+ }
+
+ RETURN(0);
+}
+
+/**
+ * Check if there are committed transaction
+ *
+ * Check if there are committed transaction in the distribute transaction
+ * list, then cancel the update records for those committed transaction.
+ * Because the distribute transaction in the list are sorted by batchid,
+ * and cancellation will be done by batchid order, so we only check the first
+ * the transaction(with lowest batchid) in the list.
+ *
+ * \param[in] lod lod device where cancel thread is
+ *
+ * \retval true if it is ready
+ * \retval false if it is not ready
+ */
+static bool tdtd_ready_for_cancel_log(struct target_distribute_txn_data *tdtd)
+{
+ struct top_multiple_thandle *tmt = NULL;
+ struct obd_device *obd = tdtd->tdtd_lut->lut_obd;
+ bool ready = false;
+
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ if (!list_empty(&tdtd->tdtd_list)) {
+ tmt = list_entry(tdtd->tdtd_list.next,
+ struct top_multiple_thandle, tmt_commit_list);
+ if (tmt->tmt_committed &&
+ (!obd->obd_recovering || (obd->obd_recovering &&
+ tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)))
+ ready = true;
+ }
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+
+ return ready;
+}
+
+struct distribute_txn_bid_data {
+ struct dt_txn_commit_cb dtbd_cb;
+ struct target_distribute_txn_data *dtbd_tdtd;
+ __u64 dtbd_batchid;
+};
+
+/**
+ * callback of updating commit batchid
+ *
+ * Updating commit batchid then wake up the commit thread to cancel the
+ * records.
+ *
+ * \param[in]env execution environment
+ * \param[in]th thandle to updating commit batchid
+ * \param[in]cb commit callback
+ * \param[in]err result of thandle
+ */
+static void distribute_txn_batchid_cb(struct lu_env *env,
+ struct thandle *th,
+ struct dt_txn_commit_cb *cb,
+ int err)
+{
+ struct distribute_txn_bid_data *dtbd = NULL;
+ struct target_distribute_txn_data *tdtd;
+
+ dtbd = container_of0(cb, struct distribute_txn_bid_data, dtbd_cb);
+ tdtd = dtbd->dtbd_tdtd;
+
+ CDEBUG(D_HA, "%s: %llu batchid updated\n",
+ tdtd->tdtd_lut->lut_obd->obd_name, dtbd->dtbd_batchid);
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ if (dtbd->dtbd_batchid > tdtd->tdtd_committed_batchid &&
+ !tdtd->tdtd_lut->lut_obd->obd_no_transno)
+ tdtd->tdtd_committed_batchid = dtbd->dtbd_batchid;
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+ atomic_dec(&tdtd->tdtd_refcount);
+ wake_up(&tdtd->tdtd_commit_thread_waitq);
+
+ OBD_FREE_PTR(dtbd);
+}
+
+/**
+ * Update the commit batchid in disk
+ *
+ * Update commit batchid in the disk, after this is committed, it can start
+ * to cancel the update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] tdtd distribute transaction structure
+ * \param[in] batchid commit batchid to be updated
+ *
+ * \retval 0 if update succeeds.
+ * \retval negative errno if update fails.
+ */
+static int
+distribute_txn_commit_batchid_update(const struct lu_env *env,
+ struct target_distribute_txn_data *tdtd,
+ __u64 batchid)
+{
+ struct distribute_txn_bid_data *dtbd = NULL;
+ struct thandle *th;
+ struct lu_buf buf;
+ __u64 tmp;
+ __u64 off;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC_PTR(dtbd);
+ if (dtbd == NULL)
+ RETURN(-ENOMEM);
+ dtbd->dtbd_batchid = batchid;
+ dtbd->dtbd_tdtd = tdtd;
+ dtbd->dtbd_cb.dcb_func = distribute_txn_batchid_cb;
+ atomic_inc(&tdtd->tdtd_refcount);
+
+ th = dt_trans_create(env, tdtd->tdtd_lut->lut_bottom);
+ if (IS_ERR(th)) {
+ atomic_dec(&tdtd->tdtd_refcount);
+ OBD_FREE_PTR(dtbd);
+ RETURN(PTR_ERR(th));
+ }
+
+ tmp = cpu_to_le64(batchid);
+ buf.lb_buf = &tmp;
+ buf.lb_len = sizeof(tmp);
+ off = 0;
+
+ rc = dt_declare_record_write(env, tdtd->tdtd_batchid_obj, &buf, off,
+ th);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, tdtd->tdtd_lut->lut_bottom, th);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = dt_trans_cb_add(th, &dtbd->dtbd_cb);
+ if (rc < 0)
+ GOTO(stop, rc);
+
+ rc = dt_record_write(env, tdtd->tdtd_batchid_obj, &buf,
+ &off, th);
+
+ CDEBUG(D_INFO, "%s: update batchid %llu: rc = %d\n",
+ tdtd->tdtd_lut->lut_obd->obd_name, batchid, rc);
+
+stop:
+ dt_trans_stop(env, tdtd->tdtd_lut->lut_bottom, th);
+ if (rc < 0) {
+ atomic_dec(&tdtd->tdtd_refcount);
+ OBD_FREE_PTR(dtbd);
+ }
+ RETURN(rc);
+}
+
+/**
+ * Init commit batchid for distribute transaction.
+ *
+ * Initialize the batchid object and get commit batchid from the object.
+ *
+ * \param[in] env execution environment
+ * \param[in] tdtd distribute transaction whose batchid is initialized.
+ *
+ * \retval 0 if initialization succeeds.
+ * \retval negative errno if initialization fails.
+ **/
+static int
+distribute_txn_commit_batchid_init(const struct lu_env *env,
+ struct target_distribute_txn_data *tdtd)
+{
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct lu_target *lut = tdtd->tdtd_lut;
+ struct lu_attr *attr = &tti->tti_attr;
+ struct lu_fid *fid = &tti->tti_fid1;
+ struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
+ struct dt_object *dt_obj = NULL;
+ struct lu_buf buf;
+ __u64 tmp;
+ __u64 off;
+ int rc;
+ ENTRY;
+
+ memset(attr, 0, sizeof(*attr));
+ attr->la_valid = LA_MODE;
+ attr->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+ dof->dof_type = dt_mode_to_dft(S_IFREG);
+
+ lu_local_obj_fid(fid, BATCHID_COMMITTED_OID);
+
+ dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof,
+ attr);
+ if (IS_ERR(dt_obj)) {
+ rc = PTR_ERR(dt_obj);
+ dt_obj = NULL;
+ GOTO(out_put, rc);
+ }
+
+ tdtd->tdtd_batchid_obj = dt_obj;
+
+ buf.lb_buf = &tmp;
+ buf.lb_len = sizeof(tmp);
+ off = 0;
+ rc = dt_read(env, dt_obj, &buf, &off);
+ if (rc < 0 || (rc < buf.lb_len && rc > 0)) {
+ CERROR("%s can't read last committed batchid: rc = %d\n",
+ tdtd->tdtd_lut->lut_obd->obd_name, rc);
+ if (rc > 0)
+ rc = -EINVAL;
+ GOTO(out_put, rc);
+ } else if (rc == buf.lb_len) {
+ tdtd->tdtd_committed_batchid = le64_to_cpu(tmp);
+ CDEBUG(D_HA, "%s: committed batchid %llu\n",
+ tdtd->tdtd_lut->lut_obd->obd_name,
+ tdtd->tdtd_committed_batchid);
+ rc = 0;
+ }
+
+out_put:
+ if (rc < 0 && dt_obj != NULL) {
+ dt_object_put(env, dt_obj);
+ tdtd->tdtd_batchid_obj = NULL;
+ }
+ return rc;
+}
+
+/**
+ * manage the distribute transaction thread
+ *
+ * Distribute transaction are linked to the list, and once the distribute
+ * transaction is committed, it will update the last committed batchid first,
+ * after it is committed, it will cancel the records.
+ *
+ * \param[in] _arg argument for commit thread
+ *
+ * \retval 0 if thread is running successfully
+ * \retval negative errno if the thread can not be run.
+ */
+static int distribute_txn_commit_thread(void *_arg)
+{
+ struct target_distribute_txn_data *tdtd = _arg;
+ struct lu_target *lut = tdtd->tdtd_lut;
+ struct ptlrpc_thread *thread = &lut->lut_tdtd_commit_thread;
+ struct l_wait_info lwi = { 0 };
+ struct lu_env env;
+ struct list_head list;
+ int rc;
+ struct top_multiple_thandle *tmt;
+ struct top_multiple_thandle *tmp;
+ __u64 batchid = 0, committed;
+
+ ENTRY;
+
+ rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
+ if (rc != 0)
+ RETURN(rc);
+
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ thread->t_flags = SVC_RUNNING;
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+ wake_up(&thread->t_ctl_waitq);
+ INIT_LIST_HEAD(&list);
+
+ CDEBUG(D_HA, "%s: start commit thread committed batchid %llu\n",
+ tdtd->tdtd_lut->lut_obd->obd_name,
+ tdtd->tdtd_committed_batchid);
+
+ while (distribute_txn_commit_thread_running(lut)) {
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
+ tmt_commit_list) {
+ if (tmt->tmt_committed == 0)
+ break;
+
+ /* Note: right now, replay is based on master MDT
+ * transno, but cancellation is based on batchid.
+ * so we do not try to cancel the update log until
+ * the recoverying is done, unless the update records
+ * batchid < committed_batchid. */
+ if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) {
+ list_move_tail(&tmt->tmt_commit_list, &list);
+ } else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) {
+ LASSERTF(tmt->tmt_batchid >= batchid,
+ "tmt %p tmt_batchid: %llu, batchid "
+ "%llu\n", tmt, tmt->tmt_batchid,
+ batchid);
+ /* There are three types of distribution
+ * transaction result
+ *
+ * 1. If tmt_result < 0, it means the
+ * distribution transaction fails, which should
+ * be rare, because once declare phase succeeds,
+ * the operation should succeeds anyway. Note in
+ * this case, we will still update batchid so
+ * cancellation would be stopped.
+ *
+ * 2. If tmt_result == 0, it means the
+ * distribution transaction succeeds, and we
+ * will update batchid.
+ *
+ * 3. If tmt_result > 0, it means distribute
+ * transaction is not yet committed on every
+ * node, but we need release this tmt before
+ * that, which usuually happens during umount.
+ */
+ if (tmt->tmt_result <= 0)
+ batchid = tmt->tmt_batchid;
+ list_move_tail(&tmt->tmt_commit_list, &list);
+ }
+ }
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+
+ CDEBUG(D_HA, "%s: batchid: %llu committed batchid "
+ "%llu\n", tdtd->tdtd_lut->lut_obd->obd_name, batchid,
+ tdtd->tdtd_committed_batchid);
+ /* update globally committed on a storage */
+ if (batchid > tdtd->tdtd_committed_batchid) {
+ rc = distribute_txn_commit_batchid_update(&env, tdtd,
+ batchid);
+ if (rc == 0)
+ batchid = 0;
+ }
+ /* cancel the records for committed batchid's */
+ /* XXX: should we postpone cancel's till the end of recovery? */
+ committed = tdtd->tdtd_committed_batchid;
+ list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
+ if (tmt->tmt_batchid > committed)
+ break;
+ list_del_init(&tmt->tmt_commit_list);
+ if (tmt->tmt_result <= 0)
+ distribute_txn_cancel_records(&env, tmt);
+ top_multiple_thandle_put(tmt);
+ }
+
+ l_wait_event(tdtd->tdtd_commit_thread_waitq,
+ !distribute_txn_commit_thread_running(lut) ||
+ committed < tdtd->tdtd_committed_batchid ||
+ tdtd_ready_for_cancel_log(tdtd), &lwi);
+ };
+
+ l_wait_event(tdtd->tdtd_commit_thread_waitq,
+ atomic_read(&tdtd->tdtd_refcount) == 0, &lwi);
+
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
+ tmt_commit_list)
+ list_move_tail(&tmt->tmt_commit_list, &list);
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+
+ CDEBUG(D_INFO, "%s stopping distribute txn commit thread.\n",
+ tdtd->tdtd_lut->lut_obd->obd_name);
+ list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
+ list_del_init(&tmt->tmt_commit_list);
+ top_multiple_thandle_dump(tmt, D_HA);
+ top_multiple_thandle_put(tmt);
+ }
+
+ thread->t_flags = SVC_STOPPED;
+ lu_env_fini(&env);
+ wake_up(&thread->t_ctl_waitq);
+
+ RETURN(0);
+}
+
+/**
+ * Start llog cancel thread
+ *
+ * Start llog cancel(master/slave) thread on LOD
+ *
+ * \param[in]lclt cancel log thread to be started.
+ *
+ * \retval 0 if the thread is started successfully.
+ * \retval negative errno if the thread is not being
+ * started.
+ */
+int distribute_txn_init(const struct lu_env *env,
+ struct lu_target *lut,
+ struct target_distribute_txn_data *tdtd,
+ __u32 index)
+{
+ struct task_struct *task;
+ struct l_wait_info lwi = { 0 };
+ int rc;
+ ENTRY;
+
+ INIT_LIST_HEAD(&tdtd->tdtd_list);
+ INIT_LIST_HEAD(&tdtd->tdtd_replay_finish_list);
+ INIT_LIST_HEAD(&tdtd->tdtd_replay_list);
+ spin_lock_init(&tdtd->tdtd_batchid_lock);
+ spin_lock_init(&tdtd->tdtd_replay_list_lock);
+ tdtd->tdtd_replay_handler = distribute_txn_replay_handle;
+ tdtd->tdtd_replay_ready = 0;
+
+ tdtd->tdtd_batchid = lut->lut_last_transno + 1;
+
+ init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq);
+ init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq);
+ init_waitqueue_head(&tdtd->tdtd_recovery_threads_waitq);
+ atomic_set(&tdtd->tdtd_refcount, 0);
+ atomic_set(&tdtd->tdtd_recovery_threads_count, 0);
+
+ tdtd->tdtd_lut = lut;
+ if (lut->lut_bottom->dd_rdonly)
+ RETURN(0);
+
+ rc = distribute_txn_commit_batchid_init(env, tdtd);
+ if (rc != 0)
+ RETURN(rc);
+
+ task = kthread_run(distribute_txn_commit_thread, tdtd, "dist_txn-%u",
+ index);
+ if (IS_ERR(task))
+ RETURN(PTR_ERR(task));
+
+ l_wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
+ distribute_txn_commit_thread_running(lut) ||
+ distribute_txn_commit_thread_stopped(lut), &lwi);
+ RETURN(0);
+}
+EXPORT_SYMBOL(distribute_txn_init);
+
+/**
+ * Stop llog cancel thread
+ *
+ * Stop llog cancel(master/slave) thread on LOD and also destory
+ * all of transaction in the list.
+ *
+ * \param[in]lclt cancel log thread to be stopped.
+ */
+void distribute_txn_fini(const struct lu_env *env,
+ struct target_distribute_txn_data *tdtd)
+{
+ struct lu_target *lut = tdtd->tdtd_lut;
+
+ /* Stop cancel thread */
+ if (lut == NULL || !distribute_txn_commit_thread_running(lut))
+ return;
+
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ lut->lut_tdtd_commit_thread.t_flags = SVC_STOPPING;
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+ wake_up(&tdtd->tdtd_commit_thread_waitq);
+ wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
+ lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED);
+
+ dtrq_list_destroy(tdtd);
+ if (tdtd->tdtd_batchid_obj != NULL) {
+ dt_object_put(env, tdtd->tdtd_batchid_obj);
+ tdtd->tdtd_batchid_obj = NULL;
+ }