4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2015, 2016, Intel Corporation.
26 * lustre/target/update_trans.c
28 * This file implements the update distribute transaction API.
30 * To manage the cross-MDT operation (distribute operation) transaction,
31 * the transaction will also be separated two layers on MD stack, top
32 * transaction and sub transaction.
34 * During the distribute operation, top transaction is created in the LOD
35 * layer, and represent the operation. Sub transaction is created by
36 * each OSD or OSP. Top transaction start/stop will trigger all of its sub
37 * transaction start/stop. Top transaction (the whole operation) is committed
38 * only all of its sub transaction are committed.
40 * there are three kinds of transactions
41 * 1. local transaction: All updates are in a single local OSD.
42 * 2. Remote transaction: All Updates are only in the remote OSD,
43 * i.e. locally all updates are in OSP.
44 * 3. Mixed transaction: Updates are both in local OSD and remote
47 * Author: Di Wang <di.wang@intel.com>
50 #define DEBUG_SUBSYSTEM S_CLASS
52 #include <linux/kthread.h>
53 #include <lu_target.h>
54 #include <lustre_log.h>
55 #include <lustre_update.h>
57 #include <obd_class.h>
58 #include <tgt_internal.h>
60 #include <tgt_internal.h>
62 * Dump top mulitple thandle
64 * Dump top multiple thandle and all of its sub thandle to the debug log.
66 * \param[in]mask debug mask
67 * \param[in]top_th top_thandle to be dumped
69 static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt,
72 struct sub_thandle *st;
74 LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
75 CDEBUG(mask, "%s tmt %p refcount %d committed %d result %d batchid %llu\n",
76 tmt->tmt_master_sub_dt ?
77 tmt->tmt_master_sub_dt->dd_lu_dev.ld_obd->obd_name :
79 tmt, atomic_read(&tmt->tmt_refcount), tmt->tmt_committed,
80 tmt->tmt_result, tmt->tmt_batchid);
82 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
83 struct sub_thandle_cookie *stc;
85 CDEBUG(mask, "st %p obd %s committed %d stopped %d sub_th %p\n",
86 st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
87 st->st_committed, st->st_stopped, st->st_sub_th);
89 list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
90 CDEBUG(mask, " cookie "DFID".%u\n",
91 PFID(&stc->stc_cookie.lgc_lgl.lgl_oi.oi_fid),
92 stc->stc_cookie.lgc_index);
98 * Declare write update to sub device
100 * Declare Write updates llog records to the sub device during distribute
103 * \param[in] env execution environment
104 * \param[in] record update records being written
105 * \param[in] sub_th sub transaction handle
106 * \param[in] record_size total update record size
108 * \retval 0 if writing succeeds
109 * \retval negative errno if writing fails
111 static int sub_declare_updates_write(const struct lu_env *env,
112 struct llog_update_record *record,
113 struct thandle *sub_th, size_t record_size)
115 struct llog_ctxt *ctxt;
116 struct dt_device *dt = sub_th->th_dev;
117 int left = record_size;
120 /* If ctxt is NULL, it means not need to write update,
121 * for example if the the OSP is used to connect to OST */
122 ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
123 LLOG_UPDATELOG_ORIG_CTXT);
125 /* Not ready to record updates yet. */
126 if (ctxt == NULL || ctxt->loc_handle == NULL) {
131 rc = llog_declare_add(env, ctxt->loc_handle,
132 &record->lur_hdr, sub_th);
136 while (left > ctxt->loc_chunk_size) {
137 rc = llog_declare_add(env, ctxt->loc_handle,
138 &record->lur_hdr, sub_th);
142 left -= ctxt->loc_chunk_size;
152 * write update to sub device
154 * Write llog update record to the sub device during distribute
155 * transaction. If it succeeds, llog cookie of the record will be
156 * returned by @cookie.
158 * \param[in] env execution environment
159 * \param[in] record update records being written
160 * \param[in] sub_th sub transaction handle
161 * \param[out] cookie llog cookie of the update record.
163 * \retval 1 if writing succeeds
164 * \retval negative errno if writing fails
166 static int sub_updates_write(const struct lu_env *env,
167 struct llog_update_record *record,
168 struct sub_thandle *sub_th)
170 struct dt_device *dt = sub_th->st_dt;
171 struct llog_ctxt *ctxt;
172 struct llog_update_record *lur = NULL;
173 __u32 update_count = 0;
174 __u32 param_count = 0;
175 __u32 last_update_count = 0;
176 __u32 last_param_count = 0;
180 struct sub_thandle_cookie *stc;
186 ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
187 LLOG_UPDATELOG_ORIG_CTXT);
188 /* If ctxt == NULL, then it means updates on OST (only happens
189 * during migration), and we do not track those updates for now */
190 /* If ctxt->loc_handle == NULL, then it does not need to record
191 * update, usually happens in error handler path */
192 if (ctxt == NULL || ctxt->loc_handle == NULL) {
197 /* Since the cross-MDT updates will includes both local
198 * and remote updates, the update ops count must > 1 */
199 LASSERT(record->lur_update_rec.ur_update_count > 1);
200 LASSERTF(record->lur_hdr.lrh_len == llog_update_record_size(record),
201 "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len,
202 llog_update_record_size(record));
204 if (likely(record->lur_hdr.lrh_len <= ctxt->loc_chunk_size)) {
207 GOTO(llog_put, rc = -ENOMEM);
208 INIT_LIST_HEAD(&stc->stc_list);
210 rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
211 &stc->stc_cookie, sub_th->st_sub_th);
213 CDEBUG(D_INFO, "%s: Add update log "DFID".%u: rc = %d\n",
214 dt->dd_lu_dev.ld_obd->obd_name,
215 PFID(&stc->stc_cookie.lgc_lgl.lgl_oi.oi_fid),
216 stc->stc_cookie.lgc_index, rc);
219 list_add(&stc->stc_list, &sub_th->st_cookie_list);
228 /* Split the records into chunk_size update record */
229 OBD_ALLOC_LARGE(lur, ctxt->loc_chunk_size);
231 GOTO(llog_put, rc = -ENOMEM);
233 memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr));
234 lur->lur_update_rec.ur_update_count = 0;
235 lur->lur_update_rec.ur_param_count = 0;
236 start = (char *)&record->lur_update_rec.ur_ops;
239 if (update_count < record->lur_update_rec.ur_update_count)
240 next = (char *)update_op_next_op(
241 (struct update_op *)cur);
242 else if (param_count < record->lur_update_rec.ur_param_count)
243 next = (char *)update_param_next_param(
244 (struct update_param *)cur);
249 * If its size > llog chunk_size, then write current chunk to
250 * the update llog, NB the padding should >= LLOG_MIN_REC_SIZE.
252 * So check padding length is either >= LLOG_MIN_REC_SIZE or is
253 * 0 (record length just matches the chunk size).
255 reclen = __llog_update_record_size(
256 __update_records_size(next - start));
257 if ((reclen + LLOG_MIN_REC_SIZE <= ctxt->loc_chunk_size ||
258 reclen == ctxt->loc_chunk_size) &&
263 record->lur_update_rec.ur_update_count)
265 else if (param_count <
266 record->lur_update_rec.ur_param_count)
271 lur->lur_update_rec.ur_update_count = update_count -
273 lur->lur_update_rec.ur_param_count = param_count -
275 memcpy(&lur->lur_update_rec.ur_ops, start, cur - start);
276 lur->lur_hdr.lrh_len = llog_update_record_size(lur);
278 LASSERT(lur->lur_hdr.lrh_len ==
279 __llog_update_record_size(
280 __update_records_size(cur - start)));
281 LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
283 update_records_dump(&lur->lur_update_rec, D_INFO, true);
287 GOTO(llog_put, rc = -ENOMEM);
288 INIT_LIST_HEAD(&stc->stc_list);
290 rc = llog_add(env, ctxt->loc_handle, &lur->lur_hdr,
291 &stc->stc_cookie, sub_th->st_sub_th);
293 CDEBUG(D_INFO, "%s: Add update log "DFID".%u: rc = %d\n",
294 dt->dd_lu_dev.ld_obd->obd_name,
295 PFID(&stc->stc_cookie.lgc_lgl.lgl_oi.oi_fid),
296 stc->stc_cookie.lgc_index, rc);
299 list_add(&stc->stc_list, &sub_th->st_cookie_list);
306 last_update_count = update_count;
307 last_param_count = param_count;
309 lur->lur_update_rec.ur_update_count = 0;
310 lur->lur_update_rec.ur_param_count = 0;
311 lur->lur_update_rec.ur_flags |= UPDATE_RECORD_CONTINUE;
316 OBD_FREE_LARGE(lur, ctxt->loc_chunk_size);
323 * Prepare the update records.
325 * Merge params and ops into the update records, then initializing
328 * During transaction execution phase, parameters and update ops
329 * are collected in two different buffers (see lod_updates_pack()),
330 * during transaction stop, it needs to be merged in one buffer,
331 * so it will be written in the update log.
333 * \param[in] env execution environment
334 * \param[in] tmt top_multiple_thandle for distribute txn
336 * \retval 0 if merging succeeds.
337 * \retval negaitive errno if merging fails.
339 static int prepare_writing_updates(const struct lu_env *env,
340 struct top_multiple_thandle *tmt)
342 struct thandle_update_records *tur = tmt->tmt_update_records;
343 struct llog_update_record *lur;
344 struct update_params *params;
348 if (tur == NULL || tur->tur_update_records == NULL ||
349 tur->tur_update_params == NULL)
352 lur = tur->tur_update_records;
353 /* Extends the update records buffer if needed */
354 params_size = update_params_size(tur->tur_update_params,
355 tur->tur_update_param_count);
356 LASSERT(lur->lur_update_rec.ur_param_count == 0);
357 update_size = llog_update_record_size(lur);
358 if (cfs_size_round(update_size + params_size) >
359 tur->tur_update_records_buf_size) {
362 rc = tur_update_records_extend(tur,
363 cfs_size_round(update_size + params_size));
367 lur = tur->tur_update_records;
370 params = update_records_get_params(&lur->lur_update_rec);
371 memcpy(params, tur->tur_update_params, params_size);
373 lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
374 lur->lur_update_rec.ur_batchid = tmt->tmt_batchid;
375 /* Init update record header */
376 lur->lur_hdr.lrh_len = llog_update_record_size(lur);
377 lur->lur_hdr.lrh_type = UPDATE_REC;
379 /* Dump updates for debugging purpose */
380 update_records_dump(&lur->lur_update_rec, D_INFO, true);
386 distribute_txn_commit_thread_running(struct lu_target *lut)
388 return lut->lut_tdtd_commit_thread.t_flags & SVC_RUNNING;
392 distribute_txn_commit_thread_stopped(struct lu_target *lut)
394 return lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED;
398 * Top thandle commit callback
400 * This callback will be called when all of sub transactions are committed.
402 * \param[in] th top thandle to be committed.
404 static void top_trans_committed_cb(struct top_multiple_thandle *tmt)
406 struct lu_target *lut;
409 LASSERT(atomic_read(&tmt->tmt_refcount) > 0);
411 top_multiple_thandle_dump(tmt, D_HA);
412 tmt->tmt_committed = 1;
413 lut = dt2lu_dev(tmt->tmt_master_sub_dt)->ld_site->ls_tgt;
414 if (distribute_txn_commit_thread_running(lut))
415 wake_up(&lut->lut_tdtd->tdtd_commit_thread_waitq);
419 struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
420 struct dt_device *dt_dev)
422 struct sub_thandle *st;
424 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
425 if (st->st_dt == dt_dev)
430 EXPORT_SYMBOL(lookup_sub_thandle);
432 struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt,
433 struct dt_device *dt_dev)
435 struct sub_thandle *st;
439 RETURN(ERR_PTR(-ENOMEM));
441 INIT_LIST_HEAD(&st->st_sub_list);
442 INIT_LIST_HEAD(&st->st_cookie_list);
445 list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list);
449 static void sub_trans_commit_cb_internal(struct top_multiple_thandle *tmt,
450 struct thandle *sub_th, int err)
452 struct sub_thandle *st;
453 bool all_committed = true;
455 /* Check if all sub thandles are committed */
456 spin_lock(&tmt->tmt_sub_lock);
457 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
458 if (st->st_sub_th == sub_th) {
459 st->st_committed = 1;
462 if (!st->st_committed)
463 all_committed = false;
465 spin_unlock(&tmt->tmt_sub_lock);
467 if (tmt->tmt_result == 0)
468 tmt->tmt_result = err;
471 top_trans_committed_cb(tmt);
473 top_multiple_thandle_dump(tmt, D_INFO);
474 top_multiple_thandle_put(tmt);
479 * sub thandle commit callback
481 * Mark the sub thandle to be committed and if all sub thandle are committed
482 * notify the top thandle.
484 * \param[in] env execution environment
485 * \param[in] sub_th sub thandle being committed
486 * \param[in] cb commit callback
487 * \param[in] err trans result
489 static void sub_trans_commit_cb(struct lu_env *env,
490 struct thandle *sub_th,
491 struct dt_txn_commit_cb *cb, int err)
493 struct top_multiple_thandle *tmt = cb->dcb_data;
495 sub_trans_commit_cb_internal(tmt, sub_th, err);
498 static void sub_thandle_register_commit_cb(struct sub_thandle *st,
499 struct top_multiple_thandle *tmt)
501 LASSERT(st->st_sub_th != NULL);
502 top_multiple_thandle_get(tmt);
503 st->st_commit_dcb.dcb_func = sub_trans_commit_cb;
504 st->st_commit_dcb.dcb_data = tmt;
505 INIT_LIST_HEAD(&st->st_commit_dcb.dcb_linkage);
506 dt_trans_cb_add(st->st_sub_th, &st->st_commit_dcb);
510 * Sub thandle stop call back
512 * After sub thandle is stopped, it will call this callback to notify
515 * \param[in] th sub thandle to be stopped
516 * \param[in] rc result of sub trans
518 static void sub_trans_stop_cb(struct lu_env *env,
519 struct thandle *sub_th,
520 struct dt_txn_commit_cb *cb, int err)
522 struct sub_thandle *st;
523 struct top_multiple_thandle *tmt = cb->dcb_data;
526 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
530 if (st->st_dt == sub_th->th_dev) {
537 wake_up(&tmt->tmt_stop_waitq);
541 static void sub_thandle_register_stop_cb(struct sub_thandle *st,
542 struct top_multiple_thandle *tmt)
544 st->st_stop_dcb.dcb_func = sub_trans_stop_cb;
545 st->st_stop_dcb.dcb_data = tmt;
546 st->st_stop_dcb.dcb_flags = DCB_TRANS_STOP;
547 INIT_LIST_HEAD(&st->st_stop_dcb.dcb_linkage);
548 dt_trans_cb_add(st->st_sub_th, &st->st_stop_dcb);
554 * Create transaction handle for sub_thandle
556 * \param[in] env execution environment
557 * \param[in] th top thandle
558 * \param[in] st sub_thandle
560 * \retval 0 if creation succeeds.
561 * \retval negative errno if creation fails.
563 int sub_thandle_trans_create(const struct lu_env *env,
564 struct top_thandle *top_th,
565 struct sub_thandle *st)
567 struct thandle *sub_th;
569 sub_th = dt_trans_create(env, st->st_dt);
571 return PTR_ERR(sub_th);
573 sub_th->th_top = &top_th->tt_super;
574 st->st_sub_th = sub_th;
576 sub_th->th_wait_submit = 1;
577 sub_thandle_register_stop_cb(st, top_th->tt_multiple_thandle);
582 * Create the top transaction.
584 * Create the top transaction on the master device. It will create a top
585 * thandle and a sub thandle on the master device.
587 * \param[in] env execution environment
588 * \param[in] master_dev master_dev the top thandle will be created
590 * \retval pointer to the created thandle.
591 * \retval ERR_PTR(errno) if creation failed.
594 top_trans_create(const struct lu_env *env, struct dt_device *master_dev)
596 struct top_thandle *top_th;
597 struct thandle *child_th;
599 OBD_ALLOC_GFP(top_th, sizeof(*top_th), __GFP_IO);
601 return ERR_PTR(-ENOMEM);
603 top_th->tt_super.th_top = &top_th->tt_super;
605 if (master_dev != NULL) {
606 child_th = dt_trans_create(env, master_dev);
607 if (IS_ERR(child_th)) {
608 OBD_FREE_PTR(top_th);
612 child_th->th_top = &top_th->tt_super;
613 child_th->th_wait_submit = 1;
614 top_th->tt_master_sub_thandle = child_th;
616 top_th->tt_super.th_tags |= child_th->th_tags;
618 return &top_th->tt_super;
620 EXPORT_SYMBOL(top_trans_create);
623 * Declare write update transaction
625 * Check if there are updates being recorded in this transaction,
626 * it will write the record into the disk.
628 * \param[in] env execution environment
629 * \param[in] tmt top multiple transaction handle
631 * \retval 0 if writing succeeds
632 * \retval negative errno if writing fails
634 static int declare_updates_write(const struct lu_env *env,
635 struct top_multiple_thandle *tmt)
637 struct llog_update_record *record;
638 struct sub_thandle *st;
641 record = tmt->tmt_update_records->tur_update_records;
642 /* Declare update write for all other target */
643 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
644 if (st->st_sub_th == NULL)
647 rc = sub_declare_updates_write(env, record, st->st_sub_th,
648 tmt->tmt_record_size);
657 * Assign batchid to the distribute transaction.
659 * Assign batchid to the distribute transaction
661 * \param[in] tmt distribute transaction
663 static void distribute_txn_assign_batchid(struct top_multiple_thandle *new)
665 struct target_distribute_txn_data *tdtd;
666 struct dt_device *dt = new->tmt_master_sub_dt;
667 struct sub_thandle *st;
670 tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd;
671 spin_lock(&tdtd->tdtd_batchid_lock);
672 new->tmt_batchid = tdtd->tdtd_batchid++;
673 list_add_tail(&new->tmt_commit_list, &tdtd->tdtd_list);
674 spin_unlock(&tdtd->tdtd_batchid_lock);
675 list_for_each_entry(st, &new->tmt_sub_thandle_list, st_sub_list) {
676 if (st->st_sub_th != NULL)
677 sub_thandle_register_commit_cb(st, new);
679 top_multiple_thandle_get(new);
680 top_multiple_thandle_dump(new, D_INFO);
684 * Insert distribute transaction to the distribute txn list.
686 * Insert distribute transaction to the distribute txn list.
688 * \param[in] new the distribute txn to be inserted.
690 void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new)
692 struct dt_device *dt = new->tmt_master_sub_dt;
693 struct top_multiple_thandle *tmt;
694 struct target_distribute_txn_data *tdtd;
695 struct sub_thandle *st;
696 bool at_head = false;
699 tdtd = dt2lu_dev(dt)->ld_site->ls_tgt->lut_tdtd;
701 spin_lock(&tdtd->tdtd_batchid_lock);
702 list_for_each_entry_reverse(tmt, &tdtd->tdtd_list, tmt_commit_list) {
703 if (new->tmt_batchid > tmt->tmt_batchid) {
704 list_add(&new->tmt_commit_list, &tmt->tmt_commit_list);
708 if (list_empty(&new->tmt_commit_list)) {
710 list_add(&new->tmt_commit_list, &tdtd->tdtd_list);
712 spin_unlock(&tdtd->tdtd_batchid_lock);
714 list_for_each_entry(st, &new->tmt_sub_thandle_list, st_sub_list) {
715 if (st->st_sub_th != NULL)
716 sub_thandle_register_commit_cb(st, new);
719 top_multiple_thandle_get(new);
720 top_multiple_thandle_dump(new, D_INFO);
721 if (new->tmt_committed && at_head)
722 wake_up(&tdtd->tdtd_commit_thread_waitq);
726 * Prepare cross-MDT operation.
728 * Create the update record buffer to record updates for cross-MDT operation,
729 * add master sub transaction to tt_sub_trans_list, and declare the update
732 * During updates packing, all of parameters will be packed in
733 * tur_update_params, and updates will be packed in tur_update_records.
734 * Then in transaction stop, parameters and updates will be merged
735 * into one updates buffer.
737 * And also master thandle will be added to the sub_th list, so it will be
738 * easy to track the commit status.
740 * \param[in] env execution environment
741 * \param[in] th top transaction handle
743 * \retval 0 if preparation succeeds.
744 * \retval negative errno if preparation fails.
746 static int prepare_multiple_node_trans(const struct lu_env *env,
747 struct top_multiple_thandle *tmt)
749 struct thandle_update_records *tur;
753 if (tmt->tmt_update_records == NULL) {
754 tur = &update_env_info(env)->uti_tur;
755 rc = check_and_prepare_update_record(env, tur);
759 tmt->tmt_update_records = tur;
760 distribute_txn_assign_batchid(tmt);
763 rc = declare_updates_write(env, tmt);
769 * start the top transaction.
771 * Start all of its sub transactions, then start master sub transaction.
773 * \param[in] env execution environment
774 * \param[in] master_dev master_dev the top thandle will be start
775 * \param[in] th top thandle
777 * \retval 0 if transaction start succeeds.
778 * \retval negative errno if start fails.
780 int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
783 struct top_thandle *top_th = container_of(th, struct top_thandle,
785 struct sub_thandle *st;
786 struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
792 top_th->tt_master_sub_thandle->th_sync = th->th_sync;
794 top_th->tt_master_sub_thandle->th_local = th->th_local;
795 top_th->tt_master_sub_thandle->th_tags = th->th_tags;
796 rc = dt_trans_start(env, top_th->tt_master_sub_thandle->th_dev,
797 top_th->tt_master_sub_thandle);
801 tmt = top_th->tt_multiple_thandle;
802 rc = prepare_multiple_node_trans(env, tmt);
806 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
807 if (st->st_sub_th == NULL)
810 st->st_sub_th->th_sync = th->th_sync;
812 st->st_sub_th->th_local = th->th_local;
813 st->st_sub_th->th_tags = th->th_tags;
814 rc = dt_trans_start(env, st->st_sub_th->th_dev,
819 LASSERT(st->st_started == 0);
826 EXPORT_SYMBOL(top_trans_start);
829 * Check whether we need write updates record
831 * Check if the updates for the top_thandle needs to be writen
832 * to all targets. Only if the transaction succeeds and the updates
833 * number > 2, it will write the updates,
835 * \params [in] top_th top thandle.
837 * \retval true if it needs to write updates
838 * \retval false if it does not need to write updates
840 static bool top_check_write_updates(struct top_thandle *top_th)
842 struct top_multiple_thandle *tmt;
843 struct thandle_update_records *tur;
845 /* Do not write updates to records if the transaction fails */
846 if (top_th->tt_super.th_result != 0)
849 tmt = top_th->tt_multiple_thandle;
853 tur = tmt->tmt_update_records;
857 /* Hmm, false update records, since the cross-MDT operation
858 * should includes both local and remote updates, so the
859 * updates count should >= 2 */
860 if (tur->tur_update_records == NULL ||
861 tur->tur_update_records->lur_update_rec.ur_update_count <= 1)
868 * Check if top transaction is stopped
870 * Check if top transaction is stopped, only if all sub transaction
871 * is stopped, then the top transaction is stopped.
873 * \param [in] top_th top thandle
875 * \retval true if the top transaction is stopped.
876 * \retval false if the top transaction is not stopped.
878 static bool top_trans_is_stopped(struct top_thandle *top_th)
880 struct top_multiple_thandle *tmt;
881 struct sub_thandle *st;
882 bool all_stopped = true;
884 tmt = top_th->tt_multiple_thandle;
885 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
886 if (!st->st_stopped && st->st_sub_th != NULL) {
891 if (st->st_result != 0 &&
892 top_th->tt_super.th_result == 0)
893 top_th->tt_super.th_result = st->st_result;
900 * Wait result of top transaction
902 * Wait until all sub transaction get its result.
904 * \param [in] top_th top thandle.
906 * \retval the result of top thandle.
908 static int top_trans_wait_result(struct top_thandle *top_th)
910 struct l_wait_info lwi = {0};
912 l_wait_event(top_th->tt_multiple_thandle->tmt_stop_waitq,
913 top_trans_is_stopped(top_th), &lwi);
915 RETURN(top_th->tt_super.th_result);
919 * Stop the top transaction.
921 * Stop the transaction on the master device first, then stop transactions
922 * on other sub devices.
924 * \param[in] env execution environment
925 * \param[in] master_dev master_dev the top thandle will be created
926 * \param[in] th top thandle
928 * \retval 0 if stop transaction succeeds.
929 * \retval negative errno if stop transaction fails.
931 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
934 struct top_thandle *top_th = container_of(th, struct top_thandle,
936 struct sub_thandle *st;
937 struct sub_thandle *master_st;
938 struct top_multiple_thandle *tmt;
939 struct thandle_update_records *tur;
940 bool write_updates = false;
944 if (likely(top_th->tt_multiple_thandle == NULL)) {
945 LASSERT(master_dev != NULL);
948 top_th->tt_master_sub_thandle->th_sync = th->th_sync;
950 top_th->tt_master_sub_thandle->th_local = th->th_local;
951 top_th->tt_master_sub_thandle->th_tags = th->th_tags;
952 rc = dt_trans_stop(env, master_dev,
953 top_th->tt_master_sub_thandle);
954 OBD_FREE_PTR(top_th);
958 tmt = top_th->tt_multiple_thandle;
959 tur = tmt->tmt_update_records;
961 /* Note: we need stop the master thandle first, then the stop
962 * callback will fill the master transno in the update logs,
963 * then these update logs will be sent to other MDTs */
964 /* get the master sub thandle */
965 master_st = lookup_sub_thandle(tmt, tmt->tmt_master_sub_dt);
966 write_updates = top_check_write_updates(top_th);
968 /* Step 1: write the updates log on Master MDT */
969 if (master_st != NULL && master_st->st_sub_th != NULL &&
971 struct llog_update_record *lur;
973 /* Merge the parameters and updates into one buffer */
974 rc = prepare_writing_updates(env, tmt);
976 CERROR("%s: cannot prepare updates: rc = %d\n",
977 master_dev->dd_lu_dev.ld_obd->obd_name, rc);
979 write_updates = false;
980 GOTO(stop_master_trans, rc);
983 lur = tur->tur_update_records;
984 /* Write updates to the master MDT */
985 rc = sub_updates_write(env, lur, master_st);
987 /* Cleanup the common parameters in the update records,
988 * master transno callback might add more parameters.
989 * and we need merge the update records again in the
991 if (tur->tur_update_params != NULL)
992 lur->lur_update_rec.ur_param_count = 0;
995 CERROR("%s: write updates failed: rc = %d\n",
996 master_dev->dd_lu_dev.ld_obd->obd_name, rc);
998 write_updates = false;
999 GOTO(stop_master_trans, rc);
1004 /* Step 2: Stop the transaction on the master MDT, and fill the
1005 * master transno in the update logs to other MDT. */
1006 if (master_st != NULL && master_st->st_sub_th != NULL) {
1008 master_st->st_sub_th->th_local = th->th_local;
1010 master_st->st_sub_th->th_sync = th->th_sync;
1011 master_st->st_sub_th->th_tags = th->th_tags;
1012 master_st->st_sub_th->th_result = th->th_result;
1013 rc = dt_trans_stop(env, master_st->st_dt, master_st->st_sub_th);
1014 /* If it does not write_updates, then we call submit callback
1015 * here, otherwise callback is done through
1016 * osd(osp)_trans_commit_cb() */
1017 if (!master_st->st_started &&
1018 !list_empty(&tmt->tmt_commit_list))
1019 sub_trans_commit_cb_internal(tmt,
1020 master_st->st_sub_th, rc);
1023 GOTO(stop_other_trans, rc);
1024 } else if (tur != NULL && tur->tur_update_records != NULL) {
1025 struct llog_update_record *lur;
1027 lur = tur->tur_update_records;
1028 if (lur->lur_update_rec.ur_master_transno == 0)
1029 /* Update master transno after master stop
1031 lur->lur_update_rec.ur_master_transno =
1032 tgt_th_info(env)->tti_transno;
1036 /* Step 3: write updates to other MDTs */
1037 if (write_updates) {
1038 struct llog_update_record *lur;
1040 /* Stop callback of master will add more updates and also update
1041 * master transno, so merge the parameters and updates into one
1043 rc = prepare_writing_updates(env, tmt);
1045 CERROR("%s: prepare updates failed: rc = %d\n",
1046 master_dev->dd_lu_dev.ld_obd->obd_name, rc);
1048 GOTO(stop_other_trans, rc);
1050 lur = tur->tur_update_records;
1051 list_for_each_entry(st, &tmt->tmt_sub_thandle_list,
1053 if (st->st_sub_th == NULL || st == master_st ||
1054 st->st_sub_th->th_result < 0)
1057 rc = sub_updates_write(env, lur, st);
1066 /* Step 4: Stop the transaction on other MDTs */
1067 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
1068 if (st == master_st || st->st_sub_th == NULL)
1072 st->st_sub_th->th_sync = th->th_sync;
1074 st->st_sub_th->th_local = th->th_local;
1075 st->st_sub_th->th_tags = th->th_tags;
1076 st->st_sub_th->th_result = th->th_result;
1077 rc = dt_trans_stop(env, st->st_sub_th->th_dev,
1079 if (unlikely(rc < 0 && th->th_result == 0))
1083 rc = top_trans_wait_result(top_th);
1085 tmt->tmt_result = rc;
1087 /* Balance for the refcount in top_trans_create, Note: if it is NOT
1088 * multiple node transaction, the top transaction will be destroyed. */
1089 top_multiple_thandle_put(tmt);
1090 OBD_FREE_PTR(top_th);
1093 EXPORT_SYMBOL(top_trans_stop);
1096 * Create top_multiple_thandle for top_thandle
1098 * Create top_mutilple_thandle to manage the mutiple node transaction
1099 * for top_thandle, and it also needs to add master sub thandle to the
1100 * sub trans list now.
1102 * \param[in] env execution environment
1103 * \param[in] top_th the top thandle
1105 * \retval 0 if creation succeeds
1106 * \retval negative errno if creation fails
1108 int top_trans_create_tmt(const struct lu_env *env,
1109 struct top_thandle *top_th)
1111 struct top_multiple_thandle *tmt;
1117 tmt->tmt_magic = TOP_THANDLE_MAGIC;
1118 INIT_LIST_HEAD(&tmt->tmt_sub_thandle_list);
1119 INIT_LIST_HEAD(&tmt->tmt_commit_list);
1120 atomic_set(&tmt->tmt_refcount, 1);
1121 spin_lock_init(&tmt->tmt_sub_lock);
1122 init_waitqueue_head(&tmt->tmt_stop_waitq);
1124 top_th->tt_multiple_thandle = tmt;
1129 static struct sub_thandle *
1130 create_sub_thandle_with_thandle(struct top_thandle *top_th,
1131 struct thandle *sub_th)
1133 struct sub_thandle *st;
1135 /* create and init sub th to the top trans list */
1136 st = create_sub_thandle(top_th->tt_multiple_thandle,
1141 st->st_sub_th = sub_th;
1143 sub_th->th_top = &top_th->tt_super;
1144 sub_thandle_register_stop_cb(st, top_th->tt_multiple_thandle);
1151 * Get sub thandle from the top thandle according to the sub dt_device.
1153 * \param[in] env execution environment
1154 * \param[in] th thandle on the top layer.
1155 * \param[in] sub_dt sub dt_device used to get sub transaction
1157 * \retval thandle of sub transaction if succeed
1158 * \retval PTR_ERR(errno) if failed
1160 struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
1162 struct dt_device *sub_dt)
1164 struct sub_thandle *st = NULL;
1165 struct sub_thandle *master_st = NULL;
1166 struct top_thandle *top_th;
1167 struct thandle *sub_th = NULL;
1171 top_th = container_of(th, struct top_thandle, tt_super);
1173 if (likely(sub_dt == top_th->tt_master_sub_thandle->th_dev))
1174 RETURN(top_th->tt_master_sub_thandle);
1176 if (top_th->tt_multiple_thandle != NULL) {
1177 st = lookup_sub_thandle(top_th->tt_multiple_thandle, sub_dt);
1179 RETURN(st->st_sub_th);
1182 sub_th = dt_trans_create(env, sub_dt);
1186 /* Create top_multiple_thandle if necessary */
1187 if (top_th->tt_multiple_thandle == NULL) {
1188 struct top_multiple_thandle *tmt;
1190 rc = top_trans_create_tmt(env, top_th);
1192 GOTO(stop_trans, rc);
1194 tmt = top_th->tt_multiple_thandle;
1196 /* Add master sub th to the top trans list */
1197 tmt->tmt_master_sub_dt =
1198 top_th->tt_master_sub_thandle->th_dev;
1199 master_st = create_sub_thandle_with_thandle(top_th,
1200 top_th->tt_master_sub_thandle);
1201 if (IS_ERR(master_st)) {
1202 rc = PTR_ERR(master_st);
1204 GOTO(stop_trans, rc);
1208 /* create and init sub th to the top trans list */
1209 st = create_sub_thandle_with_thandle(top_th, sub_th);
1213 GOTO(stop_trans, rc);
1215 st->st_sub_th->th_wait_submit = 1;
1218 if (master_st != NULL) {
1219 list_del(&master_st->st_sub_list);
1220 OBD_FREE_PTR(master_st);
1222 sub_th->th_result = rc;
1223 dt_trans_stop(env, sub_dt, sub_th);
1224 sub_th = ERR_PTR(rc);
1229 EXPORT_SYMBOL(thandle_get_sub_by_dt);
1232 * Top multiple thandle destroy
1234 * Destroy multiple thandle and all its sub thandle.
1236 * \param[in] tmt top_multiple_thandle to be destroyed.
1238 void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt)
1240 struct sub_thandle *st;
1241 struct sub_thandle *tmp;
1243 LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
1244 list_for_each_entry_safe(st, tmp, &tmt->tmt_sub_thandle_list,
1246 struct sub_thandle_cookie *stc;
1247 struct sub_thandle_cookie *tmp;
1249 list_del(&st->st_sub_list);
1250 list_for_each_entry_safe(stc, tmp, &st->st_cookie_list,
1252 list_del(&stc->stc_list);
1259 EXPORT_SYMBOL(top_multiple_thandle_destroy);
1262 * Cancel the update log on MDTs
1264 * Cancel the update log on MDTs then destroy the thandle.
1266 * \param[in] env execution environment
1267 * \param[in] tmt the top multiple thandle whose updates records
1268 * will be cancelled.
1270 * \retval 0 if cancellation succeeds.
1271 * \retval negative errno if cancellation fails.
1273 static int distribute_txn_cancel_records(const struct lu_env *env,
1274 struct top_multiple_thandle *tmt)
1276 struct sub_thandle *st;
1279 top_multiple_thandle_dump(tmt, D_INFO);
1280 /* Cancel update logs on other MDTs */
1281 list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
1282 struct llog_ctxt *ctxt;
1283 struct obd_device *obd;
1284 struct llog_cookie *cookie;
1285 struct sub_thandle_cookie *stc;
1288 obd = st->st_dt->dd_lu_dev.ld_obd;
1289 ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
1292 list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
1293 cookie = &stc->stc_cookie;
1294 if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid))
1297 rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1,
1299 CDEBUG(D_HA, "%s: batchid %llu cancel update log "
1300 DFID".%u: rc = %d\n", obd->obd_name,
1302 PFID(&cookie->lgc_lgl.lgl_oi.oi_fid),
1303 cookie->lgc_index, rc);
1306 llog_ctxt_put(ctxt);
1313 * Check if there are committed transaction
1315 * Check if there are committed transaction in the distribute transaction
1316 * list, then cancel the update records for those committed transaction.
1317 * Because the distribute transaction in the list are sorted by batchid,
1318 * and cancellation will be done by batchid order, so we only check the first
1319 * the transaction(with lowest batchid) in the list.
1321 * \param[in] lod lod device where cancel thread is
1323 * \retval true if it is ready
1324 * \retval false if it is not ready
1326 static bool tdtd_ready_for_cancel_log(struct target_distribute_txn_data *tdtd)
1328 struct top_multiple_thandle *tmt = NULL;
1329 struct obd_device *obd = tdtd->tdtd_lut->lut_obd;
1332 spin_lock(&tdtd->tdtd_batchid_lock);
1333 if (!list_empty(&tdtd->tdtd_list)) {
1334 tmt = list_entry(tdtd->tdtd_list.next,
1335 struct top_multiple_thandle, tmt_commit_list);
1336 if (tmt->tmt_committed &&
1337 (!obd->obd_recovering || (obd->obd_recovering &&
1338 tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)))
1341 spin_unlock(&tdtd->tdtd_batchid_lock);
1346 struct distribute_txn_bid_data {
1347 struct dt_txn_commit_cb dtbd_cb;
1348 struct target_distribute_txn_data *dtbd_tdtd;
1353 * callback of updating commit batchid
1355 * Updating commit batchid then wake up the commit thread to cancel the
1358 * \param[in]env execution environment
1359 * \param[in]th thandle to updating commit batchid
1360 * \param[in]cb commit callback
1361 * \param[in]err result of thandle
1363 static void distribute_txn_batchid_cb(struct lu_env *env,
1365 struct dt_txn_commit_cb *cb,
1368 struct distribute_txn_bid_data *dtbd = NULL;
1369 struct target_distribute_txn_data *tdtd;
1371 dtbd = container_of0(cb, struct distribute_txn_bid_data, dtbd_cb);
1372 tdtd = dtbd->dtbd_tdtd;
1374 CDEBUG(D_HA, "%s: %llu batchid updated\n",
1375 tdtd->tdtd_lut->lut_obd->obd_name, dtbd->dtbd_batchid);
1376 spin_lock(&tdtd->tdtd_batchid_lock);
1377 if (dtbd->dtbd_batchid > tdtd->tdtd_committed_batchid &&
1378 !tdtd->tdtd_lut->lut_obd->obd_no_transno)
1379 tdtd->tdtd_committed_batchid = dtbd->dtbd_batchid;
1380 spin_unlock(&tdtd->tdtd_batchid_lock);
1381 atomic_dec(&tdtd->tdtd_refcount);
1382 wake_up(&tdtd->tdtd_commit_thread_waitq);
1388 * Update the commit batchid in disk
1390 * Update commit batchid in the disk, after this is committed, it can start
1391 * to cancel the update records.
1393 * \param[in] env execution environment
1394 * \param[in] tdtd distribute transaction structure
1395 * \param[in] batchid commit batchid to be updated
1397 * \retval 0 if update succeeds.
1398 * \retval negative errno if update fails.
1401 distribute_txn_commit_batchid_update(const struct lu_env *env,
1402 struct target_distribute_txn_data *tdtd,
1405 struct distribute_txn_bid_data *dtbd = NULL;
1413 OBD_ALLOC_PTR(dtbd);
1416 dtbd->dtbd_batchid = batchid;
1417 dtbd->dtbd_tdtd = tdtd;
1418 dtbd->dtbd_cb.dcb_func = distribute_txn_batchid_cb;
1419 atomic_inc(&tdtd->tdtd_refcount);
1421 th = dt_trans_create(env, tdtd->tdtd_lut->lut_bottom);
1424 RETURN(PTR_ERR(th));
1427 tmp = cpu_to_le64(batchid);
1429 buf.lb_len = sizeof(tmp);
1432 rc = dt_declare_record_write(env, tdtd->tdtd_batchid_obj, &buf, off,
1437 rc = dt_trans_start_local(env, tdtd->tdtd_lut->lut_bottom, th);
1441 rc = dt_trans_cb_add(th, &dtbd->dtbd_cb);
1445 rc = dt_record_write(env, tdtd->tdtd_batchid_obj, &buf,
1448 CDEBUG(D_INFO, "%s: update batchid %llu: rc = %d\n",
1449 tdtd->tdtd_lut->lut_obd->obd_name, batchid, rc);
1452 dt_trans_stop(env, tdtd->tdtd_lut->lut_bottom, th);
1459 * Init commit batchid for distribute transaction.
1461 * Initialize the batchid object and get commit batchid from the object.
1463 * \param[in] env execution environment
1464 * \param[in] tdtd distribute transaction whose batchid is initialized.
1466 * \retval 0 if initialization succeeds.
1467 * \retval negative errno if initialization fails.
1470 distribute_txn_commit_batchid_init(const struct lu_env *env,
1471 struct target_distribute_txn_data *tdtd)
1473 struct tgt_thread_info *tti = tgt_th_info(env);
1474 struct lu_target *lut = tdtd->tdtd_lut;
1475 struct lu_attr *attr = &tti->tti_attr;
1476 struct lu_fid *fid = &tti->tti_fid1;
1477 struct dt_object_format *dof = &tti->tti_u.update.tti_update_dof;
1478 struct dt_object *dt_obj = NULL;
1485 memset(attr, 0, sizeof(*attr));
1486 attr->la_valid = LA_MODE;
1487 attr->la_mode = S_IFREG | S_IRUGO | S_IWUSR;
1488 dof->dof_type = dt_mode_to_dft(S_IFREG);
1490 lu_local_obj_fid(fid, BATCHID_COMMITTED_OID);
1492 dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof,
1494 if (IS_ERR(dt_obj)) {
1495 rc = PTR_ERR(dt_obj);
1500 tdtd->tdtd_batchid_obj = dt_obj;
1503 buf.lb_len = sizeof(tmp);
1505 rc = dt_read(env, dt_obj, &buf, &off);
1506 if (rc < 0 || (rc < buf.lb_len && rc > 0)) {
1507 CERROR("%s can't read last committed batchid: rc = %d\n",
1508 tdtd->tdtd_lut->lut_obd->obd_name, rc);
1512 } else if (rc == buf.lb_len) {
1513 tdtd->tdtd_committed_batchid = le64_to_cpu(tmp);
1514 CDEBUG(D_HA, "%s: committed batchid %llu\n",
1515 tdtd->tdtd_lut->lut_obd->obd_name,
1516 tdtd->tdtd_committed_batchid);
1521 if (rc < 0 && dt_obj != NULL) {
1522 dt_object_put(env, dt_obj);
1523 tdtd->tdtd_batchid_obj = NULL;
1529 * manage the distribute transaction thread
1531 * Distribute transaction are linked to the list, and once the distribute
1532 * transaction is committed, it will update the last committed batchid first,
1533 * after it is committed, it will cancel the records.
1535 * \param[in] _arg argument for commit thread
1537 * \retval 0 if thread is running successfully
1538 * \retval negative errno if the thread can not be run.
1540 static int distribute_txn_commit_thread(void *_arg)
1542 struct target_distribute_txn_data *tdtd = _arg;
1543 struct lu_target *lut = tdtd->tdtd_lut;
1544 struct ptlrpc_thread *thread = &lut->lut_tdtd_commit_thread;
1545 struct l_wait_info lwi = { 0 };
1547 struct list_head list;
1549 struct top_multiple_thandle *tmt;
1550 struct top_multiple_thandle *tmp;
1551 __u64 batchid = 0, committed;
1555 rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
1559 spin_lock(&tdtd->tdtd_batchid_lock);
1560 thread->t_flags = SVC_RUNNING;
1561 spin_unlock(&tdtd->tdtd_batchid_lock);
1562 wake_up(&thread->t_ctl_waitq);
1563 INIT_LIST_HEAD(&list);
1565 CDEBUG(D_HA, "%s: start commit thread committed batchid %llu\n",
1566 tdtd->tdtd_lut->lut_obd->obd_name,
1567 tdtd->tdtd_committed_batchid);
1569 while (distribute_txn_commit_thread_running(lut)) {
1570 spin_lock(&tdtd->tdtd_batchid_lock);
1571 list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
1573 if (tmt->tmt_committed == 0)
1576 /* Note: right now, replay is based on master MDT
1577 * transno, but cancellation is based on batchid.
1578 * so we do not try to cancel the update log until
1579 * the recoverying is done, unless the update records
1580 * batchid < committed_batchid. */
1581 if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) {
1582 list_move_tail(&tmt->tmt_commit_list, &list);
1583 } else if (!tdtd->tdtd_lut->lut_obd->obd_recovering) {
1584 LASSERTF(tmt->tmt_batchid >= batchid,
1585 "tmt %p tmt_batchid: %llu, batchid "
1586 "%llu\n", tmt, tmt->tmt_batchid,
1588 /* There are three types of distribution
1589 * transaction result
1591 * 1. If tmt_result < 0, it means the
1592 * distribution transaction fails, which should
1593 * be rare, because once declare phase succeeds,
1594 * the operation should succeeds anyway. Note in
1595 * this case, we will still update batchid so
1596 * cancellation would be stopped.
1598 * 2. If tmt_result == 0, it means the
1599 * distribution transaction succeeds, and we
1600 * will update batchid.
1602 * 3. If tmt_result > 0, it means distribute
1603 * transaction is not yet committed on every
1604 * node, but we need release this tmt before
1605 * that, which usuually happens during umount.
1607 if (tmt->tmt_result <= 0)
1608 batchid = tmt->tmt_batchid;
1609 list_move_tail(&tmt->tmt_commit_list, &list);
1612 spin_unlock(&tdtd->tdtd_batchid_lock);
1614 CDEBUG(D_HA, "%s: batchid: %llu committed batchid "
1615 "%llu\n", tdtd->tdtd_lut->lut_obd->obd_name, batchid,
1616 tdtd->tdtd_committed_batchid);
1617 /* update globally committed on a storage */
1618 if (batchid > tdtd->tdtd_committed_batchid) {
1619 rc = distribute_txn_commit_batchid_update(&env, tdtd,
1624 /* cancel the records for committed batchid's */
1625 /* XXX: should we postpone cancel's till the end of recovery? */
1626 committed = tdtd->tdtd_committed_batchid;
1627 list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
1628 if (tmt->tmt_batchid > committed)
1630 list_del_init(&tmt->tmt_commit_list);
1631 if (tmt->tmt_result <= 0)
1632 distribute_txn_cancel_records(&env, tmt);
1633 top_multiple_thandle_put(tmt);
1636 l_wait_event(tdtd->tdtd_commit_thread_waitq,
1637 !distribute_txn_commit_thread_running(lut) ||
1638 committed < tdtd->tdtd_committed_batchid ||
1639 tdtd_ready_for_cancel_log(tdtd), &lwi);
1642 l_wait_event(tdtd->tdtd_commit_thread_waitq,
1643 atomic_read(&tdtd->tdtd_refcount) == 0, &lwi);
1645 spin_lock(&tdtd->tdtd_batchid_lock);
1646 list_for_each_entry_safe(tmt, tmp, &tdtd->tdtd_list,
1648 list_move_tail(&tmt->tmt_commit_list, &list);
1649 spin_unlock(&tdtd->tdtd_batchid_lock);
1651 CDEBUG(D_INFO, "%s stopping distribute txn commit thread.\n",
1652 tdtd->tdtd_lut->lut_obd->obd_name);
1653 list_for_each_entry_safe(tmt, tmp, &list, tmt_commit_list) {
1654 list_del_init(&tmt->tmt_commit_list);
1655 top_multiple_thandle_dump(tmt, D_HA);
1656 top_multiple_thandle_put(tmt);
1659 thread->t_flags = SVC_STOPPED;
1661 wake_up(&thread->t_ctl_waitq);
1667 * Start llog cancel thread
1669 * Start llog cancel(master/slave) thread on LOD
1671 * \param[in]lclt cancel log thread to be started.
1673 * \retval 0 if the thread is started successfully.
1674 * \retval negative errno if the thread is not being
1677 int distribute_txn_init(const struct lu_env *env,
1678 struct lu_target *lut,
1679 struct target_distribute_txn_data *tdtd,
1682 struct task_struct *task;
1683 struct l_wait_info lwi = { 0 };
1687 INIT_LIST_HEAD(&tdtd->tdtd_list);
1688 INIT_LIST_HEAD(&tdtd->tdtd_replay_finish_list);
1689 INIT_LIST_HEAD(&tdtd->tdtd_replay_list);
1690 spin_lock_init(&tdtd->tdtd_batchid_lock);
1691 spin_lock_init(&tdtd->tdtd_replay_list_lock);
1692 tdtd->tdtd_replay_handler = distribute_txn_replay_handle;
1693 tdtd->tdtd_replay_ready = 0;
1695 tdtd->tdtd_batchid = lut->lut_last_transno + 1;
1697 init_waitqueue_head(&lut->lut_tdtd_commit_thread.t_ctl_waitq);
1698 init_waitqueue_head(&tdtd->tdtd_commit_thread_waitq);
1699 init_waitqueue_head(&tdtd->tdtd_recovery_threads_waitq);
1700 atomic_set(&tdtd->tdtd_refcount, 0);
1701 atomic_set(&tdtd->tdtd_recovery_threads_count, 0);
1703 tdtd->tdtd_lut = lut;
1704 if (lut->lut_bottom->dd_rdonly)
1707 rc = distribute_txn_commit_batchid_init(env, tdtd);
1711 task = kthread_run(distribute_txn_commit_thread, tdtd, "tdtd-%u",
1714 RETURN(PTR_ERR(task));
1716 l_wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
1717 distribute_txn_commit_thread_running(lut) ||
1718 distribute_txn_commit_thread_stopped(lut), &lwi);
1721 EXPORT_SYMBOL(distribute_txn_init);
1724 * Stop llog cancel thread
1726 * Stop llog cancel(master/slave) thread on LOD and also destory
1727 * all of transaction in the list.
1729 * \param[in]lclt cancel log thread to be stopped.
1731 void distribute_txn_fini(const struct lu_env *env,
1732 struct target_distribute_txn_data *tdtd)
1734 struct lu_target *lut = tdtd->tdtd_lut;
1736 /* Stop cancel thread */
1737 if (lut == NULL || !distribute_txn_commit_thread_running(lut))
1740 spin_lock(&tdtd->tdtd_batchid_lock);
1741 lut->lut_tdtd_commit_thread.t_flags = SVC_STOPPING;
1742 spin_unlock(&tdtd->tdtd_batchid_lock);
1743 wake_up(&tdtd->tdtd_commit_thread_waitq);
1744 wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq,
1745 lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED);
1747 dtrq_list_destroy(tdtd);
1748 if (tdtd->tdtd_batchid_obj != NULL) {
1749 dt_object_put(env, tdtd->tdtd_batchid_obj);
1750 tdtd->tdtd_batchid_obj = NULL;
1753 EXPORT_SYMBOL(distribute_txn_fini);