4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2014, Intel Corporation.
27 * lustre/target/update_recovery.c
29 * This file implement the methods to handle the update recovery.
31 * During DNE recovery, the recovery thread will redo the operation according
32 * to the transaction no, and these replay are either from client replay req
33 * or update replay records(for distribute transaction) in the update log.
34 * For distribute transaction replay, the replay thread will call
35 * distribute_txn_replay_handle() to handle the updates.
37 * After the Master MDT restarts, it will retrieve the update records from all
38 * of MDTs, for each distributed operation, it will check updates on all MDTs,
39 * if some updates records are missing on some MDTs, the replay thread will redo
40 * updates on these MDTs.
42 * Author: Di Wang <di.wang@intel.com>
44 #define DEBUG_SUBSYSTEM S_CLASS
46 #include <lu_target.h>
47 #include <md_object.h>
48 #include <lustre_update.h>
50 #include <obd_class.h>
51 #include "tgt_internal.h"
54 * Lookup distribute_txn_replay req
56 * Lookup distribute_txn_replay in the replay list by batchid.
57 * It is assumed the list has been locked before calling this function.
59 * \param[in] tdtd distribute_txn_data, which holds the replay
61 * \param[in] batchid batchid used by lookup.
63 * \retval pointer of the replay if succeeds.
64 * \retval NULL if can not find it.
66 static struct distribute_txn_replay_req *
67 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
69 struct distribute_txn_replay_req *tmp;
70 struct distribute_txn_replay_req *dtrq = NULL;
72 list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
73 if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
82 * insert distribute txn replay req
84 * Insert distribute txn replay to the replay list, and it assumes the
85 * list has been looked. Note: the replay list is a sorted list, which
86 * is sorted by master transno. It is assumed the replay list has been
87 * locked before calling this function.
89 * \param[in] tdtd target distribute txn data where replay list is
90 * \param[in] new distribute txn replay to be inserted
92 * \retval 0 if insertion succeeds
93 * \retval EEXIST if the dtrq already exists
95 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
96 struct distribute_txn_replay_req *new)
98 struct distribute_txn_replay_req *iter;
100 list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
101 if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
102 new->dtrq_lur->lur_update_rec.ur_master_transno)
105 /* If there are mulitple replay req with same transno, then
106 * sort them with batchid */
107 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
108 new->dtrq_lur->lur_update_rec.ur_master_transno &&
109 iter->dtrq_lur->lur_update_rec.ur_batchid ==
110 new->dtrq_lur->lur_update_rec.ur_batchid)
113 /* If there are mulitple replay req with same transno, then
114 * sort them with batchid */
115 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
116 new->dtrq_lur->lur_update_rec.ur_master_transno &&
117 iter->dtrq_lur->lur_update_rec.ur_batchid >
118 new->dtrq_lur->lur_update_rec.ur_batchid)
121 list_add(&new->dtrq_list, &iter->dtrq_list);
125 if (list_empty(&new->dtrq_list))
126 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
132 * create distribute txn replay req
134 * Allocate distribute txn replay req according to the update records.
136 * \param[in] tdtd target distribute txn data where replay list is.
137 * \param[in] record update records from the update log.
139 * \retval the pointer of distribute txn replay req if
140 * the creation succeeds.
141 * \retval NULL if the creation fails.
143 static struct distribute_txn_replay_req *
144 dtrq_create(struct llog_update_record *lur)
146 struct distribute_txn_replay_req *new;
150 RETURN(ERR_PTR(-ENOMEM));
152 new->dtrq_lur_size = llog_update_record_size(lur);
153 OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
154 if (new->dtrq_lur == NULL) {
156 RETURN(ERR_PTR(-ENOMEM));
159 memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
161 spin_lock_init(&new->dtrq_sub_list_lock);
162 INIT_LIST_HEAD(&new->dtrq_sub_list);
163 INIT_LIST_HEAD(&new->dtrq_list);
169 * Lookup distribute sub replay
171 * Lookup distribute sub replay in the sub list of distribute_txn_replay by
174 * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
175 * \param[in] mdt_index the mdt_index as the key of lookup
177 * \retval the pointer of sub replay if it can be found.
178 * \retval NULL if it can not find.
180 struct distribute_txn_replay_req_sub *
181 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
183 struct distribute_txn_replay_req_sub *dtrqs = NULL;
184 struct distribute_txn_replay_req_sub *tmp;
186 list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
187 if (tmp->dtrqs_mdt_index == mdt_index) {
196 * Insert distribute txn sub req replay
198 * Allocate sub replay req and insert distribute txn replay list.
200 * \param[in] dtrq d to be added
201 * \param[in] cookie the cookie of the update record
202 * \param[in] mdt_index the mdt_index of the update record
204 * \retval 0 if the adding succeeds.
205 * \retval negative errno if the adding fails.
208 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
209 struct llog_cookie *cookie,
212 struct distribute_txn_replay_req_sub *dtrqs = NULL;
213 struct distribute_txn_replay_req_sub *new;
216 spin_lock(&dtrq->dtrq_sub_list_lock);
217 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
218 spin_unlock(&dtrq->dtrq_sub_list_lock);
226 INIT_LIST_HEAD(&new->dtrqs_list);
227 new->dtrqs_mdt_index = mdt_index;
228 new->dtrqs_llog_cookie = *cookie;
229 spin_lock(&dtrq->dtrq_sub_list_lock);
230 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
232 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
235 spin_unlock(&dtrq->dtrq_sub_list_lock);
241 * Insert update records to the replay list.
243 * Allocate distribute txn replay req and insert it into the replay
244 * list, then insert the update records into the replay req.
246 * \param[in] tdtd distribute txn replay data where the replay list
248 * \param[in] record the update record
249 * \param[in] cookie cookie of the record
250 * \param[in] index mdt index of the record
252 * \retval 0 if the adding succeeds.
253 * \retval negative errno if the adding fails.
256 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
257 struct llog_update_record *lur,
258 struct llog_cookie *cookie,
261 struct distribute_txn_replay_req *dtrq;
262 struct update_records *record = &lur->lur_update_rec;
266 CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
267 " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
268 record->ur_batchid, record->ur_master_transno, mdt_index);
270 spin_lock(&tdtd->tdtd_replay_list_lock);
271 dtrq = dtrq_lookup(tdtd, record->ur_batchid);
272 spin_unlock(&tdtd->tdtd_replay_list_lock);
274 /* If the transno in the update record is 0, it means the
275 * update are from master MDT, and we will use the master
276 * last committed transno as its batchid. Note: if it got
277 * the records from the slave later, it needs to update
278 * the batchid by the transno in slave update log (see below) */
279 dtrq = dtrq_create(lur);
281 RETURN(PTR_ERR(dtrq));
283 if (record->ur_master_transno == 0)
284 dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
285 tdtd->tdtd_lut->lut_last_transno;
286 spin_lock(&tdtd->tdtd_replay_list_lock);
287 rc = dtrq_insert(tdtd, dtrq);
288 spin_unlock(&tdtd->tdtd_replay_list_lock);
289 } else if (record->ur_master_transno != 0 &&
290 dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
291 record->ur_master_transno) {
292 /* If the master transno in update header is not matched with
293 * the one in the record, then it means the dtrq is originally
294 * created by master record, and we need update master transno
295 * and reposition the dtrq(by master transno). */
296 dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
297 record->ur_master_transno;
298 list_del_init(&dtrq->dtrq_list);
299 spin_lock(&tdtd->tdtd_replay_list_lock);
300 rc = dtrq_insert(tdtd, dtrq);
301 spin_unlock(&tdtd->tdtd_replay_list_lock);
310 rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
314 EXPORT_SYMBOL(insert_update_records_to_replay_list);
317 * Dump updates of distribute txns.
319 * Output all of recovery updates in the distribute txn list to the
322 * \param[in] tdtd distribute txn data where all of distribute txn
324 * \param[in] mask debug mask
326 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
328 struct distribute_txn_replay_req *dtrq;
330 spin_lock(&tdtd->tdtd_replay_list_lock);
331 list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
332 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
334 spin_unlock(&tdtd->tdtd_replay_list_lock);
336 EXPORT_SYMBOL(dtrq_list_dump);
339 * Destroy distribute txn replay req
341 * Destroy distribute txn replay req and all of subs.
343 * \param[in] dtrq distribute txn replqy req to be destroyed.
345 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
347 struct distribute_txn_replay_req_sub *dtrqs;
348 struct distribute_txn_replay_req_sub *tmp;
350 LASSERT(list_empty(&dtrq->dtrq_list));
351 spin_lock(&dtrq->dtrq_sub_list_lock);
352 list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
353 list_del(&dtrqs->dtrqs_list);
356 spin_unlock(&dtrq->dtrq_sub_list_lock);
358 if (dtrq->dtrq_lur != NULL)
359 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
363 EXPORT_SYMBOL(dtrq_destroy);
366 * Destroy all of replay req.
368 * Destroy all of replay req in the replay list.
370 * \param[in] tdtd target distribute txn data where the replay list is.
372 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
374 struct distribute_txn_replay_req *dtrq;
375 struct distribute_txn_replay_req *tmp;
377 spin_lock(&tdtd->tdtd_replay_list_lock);
378 list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
380 list_del_init(&dtrq->dtrq_list);
383 spin_unlock(&tdtd->tdtd_replay_list_lock);
385 EXPORT_SYMBOL(dtrq_list_destroy);
388 * Get next req in the replay list
390 * Get next req needs to be replayed, since it is a sorted list
391 * (by master MDT transno)
393 * \param[in] tdtd distribute txn data where the replay list is
395 * \retval the pointer of update recovery header
397 struct distribute_txn_replay_req *
398 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
400 struct distribute_txn_replay_req *dtrq = NULL;
402 spin_lock(&tdtd->tdtd_replay_list_lock);
403 if (!list_empty(&tdtd->tdtd_replay_list)) {
404 dtrq = list_entry(tdtd->tdtd_replay_list.next,
405 struct distribute_txn_replay_req, dtrq_list);
406 list_del_init(&dtrq->dtrq_list);
408 spin_unlock(&tdtd->tdtd_replay_list_lock);
412 EXPORT_SYMBOL(distribute_txn_get_next_req);
415 * Get next transno in the replay list, because this is the sorted
416 * list, so it will return the transno of next req in the list.
418 * \param[in] tdtd distribute txn data where the replay list is
420 * \retval the transno of next update in the list
422 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
424 struct distribute_txn_replay_req *dtrq = NULL;
427 spin_lock(&tdtd->tdtd_replay_list_lock);
428 if (!list_empty(&tdtd->tdtd_replay_list)) {
429 dtrq = list_entry(tdtd->tdtd_replay_list.next,
430 struct distribute_txn_replay_req, dtrq_list);
431 transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
433 spin_unlock(&tdtd->tdtd_replay_list_lock);
435 CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
436 tdtd->tdtd_lut->lut_obd->obd_name, transno);
439 EXPORT_SYMBOL(distribute_txn_get_next_transno);
442 * Check if the update of one object is committed
444 * Check whether the update for the object is committed by checking whether
445 * the correspondent sub exists in the replay req. If it is committed, mark
446 * the committed flag in correspondent the sub thandle.
448 * \param[in] env execution environment
449 * \param[in] dtrq replay request
450 * \param[in] dt_obj object for the update
451 * \param[in] top_th top thandle
452 * \param[in] sub_th sub thandle which the update belongs to
454 * \retval 1 if the update is not committed.
455 * \retval 0 if the update is committed.
456 * \retval negative errno if some other failures happen.
458 static int update_is_committed(const struct lu_env *env,
459 struct distribute_txn_replay_req *dtrq,
460 struct dt_object *dt_obj,
461 struct top_thandle *top_th,
462 struct sub_thandle *st)
464 struct seq_server_site *seq_site;
465 const struct lu_fid *fid = lu_object_fid(&dt_obj->do_lu);
466 struct distribute_txn_replay_req_sub *dtrqs;
470 if (st->st_sub_th != NULL)
473 if (st->st_committed)
476 seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
477 if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
478 mdt_index = fid_oid(fid);
479 } else if (!fid_seq_in_fldb(fid_seq(fid))) {
480 mdt_index = seq_site->ss_node_id;
482 struct lu_server_fld *fld;
483 struct lu_seq_range range = {0};
486 fld = seq_site->ss_server_fld;
487 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
488 LASSERT(fld->lsf_seq_lookup != NULL);
489 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
493 mdt_index = range.lsr_index;
496 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
497 if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
498 st->st_committed = 1;
500 st->st_cookie = dtrqs->dtrqs_llog_cookie;
504 CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
511 * Implementation of different update methods for update recovery.
513 * These following functions update_recovery_$(update_name) implement
514 * different updates recovery methods. They will extract the parameters
515 * from the common parameters area and call correspondent dt API to redo
518 * \param[in] env execution environment
519 * \param[in] op update operation to be replayed
520 * \param[in] params common update parameters which holds all parameters
522 * \param[in] th transaction handle
523 * \param[in] declare indicate it will do declare or real execution, true
524 * means declare, false means real execution
526 * \retval 0 if it succeeds.
527 * \retval negative errno if it fails.
529 static int update_recovery_create(const struct lu_env *env,
530 struct dt_object *dt_obj,
531 const struct update_op *op,
532 const struct update_params *params,
533 struct thandle_exec_args *ta,
536 struct update_thread_info *uti = update_env_info(env);
537 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
538 struct lu_attr *attr = &uti->uti_attr;
540 struct obdo *lobdo = &uti->uti_obdo;
541 struct dt_object_format dof;
543 unsigned int param_count;
547 if (dt_object_exists(dt_obj))
550 param_count = lur->lur_update_rec.ur_param_count;
551 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
555 if (size != sizeof(*wobdo))
558 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
559 lustre_swab_obdo(wobdo);
561 lustre_get_wire_obdo(NULL, lobdo, wobdo);
562 la_from_obdo(attr, lobdo, lobdo->o_valid);
564 dof.dof_type = dt_mode_to_dft(attr->la_mode);
566 rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
572 static int update_recovery_destroy(const struct lu_env *env,
573 struct dt_object *dt_obj,
574 const struct update_op *op,
575 const struct update_params *params,
576 struct thandle_exec_args *ta,
582 rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
587 static int update_recovery_ref_add(const struct lu_env *env,
588 struct dt_object *dt_obj,
589 const struct update_op *op,
590 const struct update_params *params,
591 struct thandle_exec_args *ta,
597 rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
602 static int update_recovery_ref_del(const struct lu_env *env,
603 struct dt_object *dt_obj,
604 const struct update_op *op,
605 const struct update_params *params,
606 struct thandle_exec_args *ta,
612 rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
617 static int update_recovery_attr_set(const struct lu_env *env,
618 struct dt_object *dt_obj,
619 const struct update_op *op,
620 const struct update_params *params,
621 struct thandle_exec_args *ta,
624 struct update_thread_info *uti = update_env_info(env);
625 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
627 struct obdo *lobdo = &uti->uti_obdo;
628 struct lu_attr *attr = &uti->uti_attr;
630 unsigned int param_count;
634 param_count = lur->lur_update_rec.ur_param_count;
635 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
639 if (size != sizeof(*wobdo))
642 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
643 lustre_swab_obdo(wobdo);
645 lustre_get_wire_obdo(NULL, lobdo, wobdo);
646 la_from_obdo(attr, lobdo, lobdo->o_valid);
648 rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
653 static int update_recovery_xattr_set(const struct lu_env *env,
654 struct dt_object *dt_obj,
655 const struct update_op *op,
656 const struct update_params *params,
657 struct thandle_exec_args *ta,
660 struct update_thread_info *uti = update_env_info(env);
669 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
670 name = update_params_get_param_buf(params,
671 op->uop_params_off[0],
676 buf = update_params_get_param_buf(params,
677 op->uop_params_off[1],
682 uti->uti_buf.lb_buf = buf;
683 uti->uti_buf.lb_len = (size_t)size;
685 buf = update_params_get_param_buf(params, op->uop_params_off[2],
689 if (size != sizeof(fl))
692 fl = le32_to_cpu(*(int *)buf);
694 rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
700 static int update_recovery_index_insert(const struct lu_env *env,
701 struct dt_object *dt_obj,
702 const struct update_op *op,
703 const struct update_params *params,
704 struct thandle_exec_args *ta,
707 struct update_thread_info *uti = update_env_info(env);
717 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
718 name = update_params_get_param_buf(params, op->uop_params_off[0],
723 fid = update_params_get_param_buf(params, op->uop_params_off[1],
727 if (size != sizeof(*fid))
730 fid_le_to_cpu(fid, fid);
732 ptype = update_params_get_param_buf(params, op->uop_params_off[2],
736 if (size != sizeof(*ptype))
738 type = le32_to_cpu(*ptype);
740 if (dt_try_as_dir(env, dt_obj) == 0)
743 uti->uti_rec.rec_fid = fid;
744 uti->uti_rec.rec_type = type;
746 rc = out_tx_index_insert(env, dt_obj,
747 (const struct dt_rec *)&uti->uti_rec,
748 (const struct dt_key *)name, ta, th,
754 static int update_recovery_index_delete(const struct lu_env *env,
755 struct dt_object *dt_obj,
756 const struct update_op *op,
757 const struct update_params *params,
758 struct thandle_exec_args *ta,
761 struct update_thread_info *uti = update_env_info(env);
768 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
769 name = update_params_get_param_buf(params, op->uop_params_off[0],
774 if (dt_try_as_dir(env, dt_obj) == 0)
777 rc = out_tx_index_delete(env, dt_obj,
778 (const struct dt_key *)name, ta, th, NULL, 0);
783 static int update_recovery_write(const struct lu_env *env,
784 struct dt_object *dt_obj,
785 const struct update_op *op,
786 const struct update_params *params,
787 struct thandle_exec_args *ta,
790 struct update_thread_info *uti = update_env_info(env);
798 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
799 buf = update_params_get_param_buf(params, op->uop_params_off[0],
804 uti->uti_buf.lb_buf = buf;
805 uti->uti_buf.lb_len = size;
807 buf = update_params_get_param_buf(params, op->uop_params_off[1],
812 pos = le64_to_cpu(*(__u64 *)buf);
814 rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
820 static int update_recovery_xattr_del(const struct lu_env *env,
821 struct dt_object *dt_obj,
822 const struct update_op *op,
823 const struct update_params *params,
824 struct thandle_exec_args *ta,
827 struct update_thread_info *uti = update_env_info(env);
834 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
835 name = update_params_get_param_buf(params, op->uop_params_off[0],
840 rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
846 * Execute updates in the update replay records
848 * Declare distribute txn replay by update records and add the updates
849 * to the execution list. Note: it will check if the update has been
850 * committed, and only execute the updates if it is not committed to
853 * \param[in] env execution environment
854 * \param[in] tdtd distribute txn replay data which hold all of replay
855 * reqs and all replay parameters.
856 * \param[in] dtrq distribute transaction replay req.
857 * \param[in] ta thandle execute args.
859 * \retval 0 if declare succeeds.
860 * \retval negative errno if declare fails.
862 static int update_recovery_exec(const struct lu_env *env,
863 struct target_distribute_txn_data *tdtd,
864 struct distribute_txn_replay_req *dtrq,
865 struct thandle_exec_args *ta)
867 struct llog_update_record *lur = dtrq->dtrq_lur;
868 struct update_records *records = &lur->lur_update_rec;
869 struct update_ops *ops = &records->ur_ops;
870 struct update_params *params = update_records_get_params(records);
871 struct top_thandle *top_th = container_of(ta->ta_handle,
874 struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
875 struct update_op *op;
880 /* These records have been swabbed in llog_cat_process() */
881 for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
882 i++, op = update_op_next_op(op)) {
883 struct lu_fid *fid = &op->uop_fid;
884 struct dt_object *dt_obj;
885 struct dt_object *sub_dt_obj;
886 struct dt_device *sub_dt;
887 struct sub_thandle *st;
889 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
890 if (IS_ERR(dt_obj)) {
891 rc = PTR_ERR(dt_obj);
894 sub_dt_obj = dt_object_child(dt_obj);
896 /* Create sub thandle if not */
897 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
898 st = lookup_sub_thandle(tmt, sub_dt);
900 st = create_sub_thandle(tmt, sub_dt);
902 GOTO(next, rc = PTR_ERR(st));
905 /* check if updates on the OSD/OSP are committed */
906 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
908 /* If this is committed, goto next */
914 /* Create thandle for sub thandle if needed */
915 if (st->st_sub_th == NULL) {
916 rc = sub_thandle_trans_create(env, top_th, st);
921 CDEBUG(D_HA, "replay %uth update\n", i);
922 switch (op->uop_type) {
924 rc = update_recovery_create(env, sub_dt_obj,
929 rc = update_recovery_destroy(env, sub_dt_obj,
934 rc = update_recovery_ref_add(env, sub_dt_obj,
939 rc = update_recovery_ref_del(env, sub_dt_obj,
944 rc = update_recovery_attr_set(env, sub_dt_obj,
949 rc = update_recovery_xattr_set(env, sub_dt_obj,
953 case OUT_INDEX_INSERT:
954 rc = update_recovery_index_insert(env, sub_dt_obj,
958 case OUT_INDEX_DELETE:
959 rc = update_recovery_index_delete(env, sub_dt_obj,
964 rc = update_recovery_write(env, sub_dt_obj,
969 rc = update_recovery_xattr_del(env, sub_dt_obj,
974 CERROR("Unknown update type %u\n", (__u32)op->uop_type);
979 lu_object_put(env, &dt_obj->do_lu);
984 ta->ta_handle->th_result = rc;
989 * redo updates on MDT if needed.
991 * During DNE recovery, the recovery thread (target_recovery_thread) will call
992 * this function to replay distribute txn updates on all MDTs. It only replay
993 * updates on the MDT where the update record is missing.
995 * If the update already exists on the MDT, then it does not need replay the
996 * updates on that MDT, and only mark the sub transaction has been committed
999 * \param[in] env execution environment
1000 * \param[in] tdtd target distribute txn data, which holds the replay list
1001 * and all parameters needed by replay process.
1002 * \param[in] dtrq distribute txn replay req.
1004 * \retval 0 if replay succeeds.
1005 * \retval negative errno if replay failes.
1007 int distribute_txn_replay_handle(struct lu_env *env,
1008 struct target_distribute_txn_data *tdtd,
1009 struct distribute_txn_replay_req *dtrq)
1011 struct update_records *records = &dtrq->dtrq_lur->lur_update_rec;
1012 struct thandle_exec_args *ta;
1013 struct lu_context session_env;
1014 struct thandle *th = NULL;
1015 struct top_thandle *top_th;
1016 struct top_multiple_thandle *tmt;
1017 struct thandle_update_records *tur = NULL;
1022 /* initialize session, it is needed for the handler of target */
1023 rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1025 CERROR("%s: failure to initialize session: rc = %d\n",
1026 tdtd->tdtd_lut->lut_obd->obd_name, rc);
1029 lu_context_enter(&session_env);
1030 env->le_ses = &session_env;
1032 update_records_dump(records, D_HA, true);
1033 th = top_trans_create(env, NULL);
1035 GOTO(exit_session, rc = PTR_ERR(th));
1037 ta = &update_env_info(env)->uti_tea;
1040 update_env_info(env)->uti_dtrq = dtrq;
1041 /* Create distribute transaction structure for this top thandle */
1042 top_th = container_of(th, struct top_thandle, tt_super);
1043 rc = top_trans_create_tmt(env, top_th);
1045 GOTO(stop_trans, rc);
1049 /* check if the distribute transaction has been committed */
1050 tmt = top_th->tt_multiple_thandle;
1051 tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1052 tmt->tmt_batchid = records->ur_batchid;
1053 tgt_th_info(env)->tti_transno = records->ur_master_transno;
1055 if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1056 tmt->tmt_committed = 1;
1058 rc = update_recovery_exec(env, tdtd, dtrq, ta);
1060 GOTO(stop_trans, rc);
1062 /* If no updates are needed to be replayed, then
1063 * mark this records as committed, so commit thread
1064 * distribute_txn_commit_thread() will delete the
1066 if (ta->ta_argno == 0)
1067 tmt->tmt_committed = 1;
1069 tur = &update_env_info(env)->uti_tur;
1070 tur->tur_update_records = dtrq->dtrq_lur;
1071 tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1072 tur->tur_update_params = NULL;
1073 tur->tur_update_param_count = 0;
1074 tmt->tmt_update_records = tur;
1076 distribute_txn_insert_by_batchid(tmt);
1077 rc = top_trans_start(env, NULL, th);
1079 GOTO(stop_trans, rc);
1081 for (i = 0; i < ta->ta_argno; i++) {
1082 struct tx_arg *ta_arg;
1083 struct dt_object *dt_obj;
1084 struct dt_device *sub_dt;
1085 struct sub_thandle *st;
1087 ta_arg = ta->ta_args[i];
1088 dt_obj = ta_arg->object;
1090 LASSERT(tmt->tmt_committed == 0);
1091 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1092 st = lookup_sub_thandle(tmt, sub_dt);
1093 LASSERT(st != NULL);
1094 LASSERT(st->st_sub_th != NULL);
1095 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1097 if (unlikely(rc < 0)) {
1098 CDEBUG(D_HA, "error during execution of #%u from"
1099 " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1100 ta->ta_args[i]->line, rc);
1102 if (ta->ta_args[i]->undo_fn != NULL) {
1103 dt_obj = ta->ta_args[i]->object;
1105 lu2dt_dev(dt_obj->do_lu.lo_dev);
1106 st = lookup_sub_thandle(tmt, sub_dt);
1107 LASSERT(st != NULL);
1108 LASSERT(st->st_sub_th != NULL);
1110 ta->ta_args[i]->undo_fn(env,
1114 CERROR("%s: undo for %s:%d: rc = %d\n",
1115 dt_obd_name(ta->ta_handle->th_dev),
1116 ta->ta_args[i]->file,
1117 ta->ta_args[i]->line, -ENOTSUPP);
1122 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1123 dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1129 rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1130 for (i = 0; i < ta->ta_argno; i++) {
1131 if (ta->ta_args[i]->object != NULL) {
1132 lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1133 ta->ta_args[i]->object = NULL;
1138 tur->tur_update_records = NULL;
1140 lu_context_exit(&session_env);
1141 lu_context_fini(&session_env);
1144 EXPORT_SYMBOL(distribute_txn_replay_handle);