4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2014, Intel Corporation.
27 * lustre/target/update_recovery.c
29 * This file implement the methods to handle the update recovery.
31 * During DNE recovery, the recovery thread will redo the operation according
32 * to the transaction no, and these replay are either from client replay req
33 * or update replay records(for distribute transaction) in the update log.
34 * For distribute transaction replay, the replay thread will call
35 * distribute_txn_replay_handle() to handle the updates.
37 * After the Master MDT restarts, it will retrieve the update records from all
38 * of MDTs, for each distributed operation, it will check updates on all MDTs,
39 * if some updates records are missing on some MDTs, the replay thread will redo
40 * updates on these MDTs.
42 * Author: Di Wang <di.wang@intel.com>
44 #define DEBUG_SUBSYSTEM S_CLASS
46 #include <lu_target.h>
47 #include <md_object.h>
48 #include <lustre_update.h>
50 #include <obd_class.h>
51 #include "tgt_internal.h"
54 * Lookup distribute_txn_replay req
56 * Lookup distribute_txn_replay in the replay list by batchid.
57 * It is assumed the list has been locked before calling this function.
59 * \param[in] tdtd distribute_txn_data, which holds the replay
61 * \param[in] batchid batchid used by lookup.
63 * \retval pointer of the replay if succeeds.
64 * \retval NULL if can not find it.
66 static struct distribute_txn_replay_req *
67 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
69 struct distribute_txn_replay_req *tmp;
70 struct distribute_txn_replay_req *dtrq = NULL;
72 list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
73 if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
82 * insert distribute txn replay req
84 * Insert distribute txn replay to the replay list, and it assumes the
85 * list has been looked. Note: the replay list is a sorted list, which
86 * is sorted by master transno. It is assumed the replay list has been
87 * locked before calling this function.
89 * \param[in] tdtd target distribute txn data where replay list is
90 * \param[in] new distribute txn replay to be inserted
92 * \retval 0 if insertion succeeds
93 * \retval EEXIST if the dtrq already exists
95 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
96 struct distribute_txn_replay_req *new)
98 struct distribute_txn_replay_req *iter;
100 list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
101 if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
102 new->dtrq_lur->lur_update_rec.ur_master_transno)
105 /* If there are mulitple replay req with same transno, then
106 * sort them with batchid */
107 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
108 new->dtrq_lur->lur_update_rec.ur_master_transno &&
109 iter->dtrq_lur->lur_update_rec.ur_batchid ==
110 new->dtrq_lur->lur_update_rec.ur_batchid)
113 /* If there are mulitple replay req with same transno, then
114 * sort them with batchid */
115 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
116 new->dtrq_lur->lur_update_rec.ur_master_transno &&
117 iter->dtrq_lur->lur_update_rec.ur_batchid >
118 new->dtrq_lur->lur_update_rec.ur_batchid)
121 list_add(&new->dtrq_list, &iter->dtrq_list);
125 if (list_empty(&new->dtrq_list))
126 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
132 * create distribute txn replay req
134 * Allocate distribute txn replay req according to the update records.
136 * \param[in] tdtd target distribute txn data where replay list is.
137 * \param[in] record update records from the update log.
139 * \retval the pointer of distribute txn replay req if
140 * the creation succeeds.
141 * \retval NULL if the creation fails.
143 static struct distribute_txn_replay_req *
144 dtrq_create(struct llog_update_record *lur)
146 struct distribute_txn_replay_req *new;
150 RETURN(ERR_PTR(-ENOMEM));
152 new->dtrq_lur_size = llog_update_record_size(lur);
153 OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
154 if (new->dtrq_lur == NULL) {
156 RETURN(ERR_PTR(-ENOMEM));
159 memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
161 spin_lock_init(&new->dtrq_sub_list_lock);
162 INIT_LIST_HEAD(&new->dtrq_sub_list);
163 INIT_LIST_HEAD(&new->dtrq_list);
169 * Lookup distribute sub replay
171 * Lookup distribute sub replay in the sub list of distribute_txn_replay by
174 * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
175 * \param[in] mdt_index the mdt_index as the key of lookup
177 * \retval the pointer of sub replay if it can be found.
178 * \retval NULL if it can not find.
180 struct distribute_txn_replay_req_sub *
181 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
183 struct distribute_txn_replay_req_sub *dtrqs = NULL;
184 struct distribute_txn_replay_req_sub *tmp;
186 list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
187 if (tmp->dtrqs_mdt_index == mdt_index) {
196 * Try to add cookie to sub distribute txn request
198 * Check if the update log cookie has been added to the request, if not,
199 * add it to the dtrqs_cookie_list.
201 * \param[in] dtrqs sub replay req where cookies to be added.
202 * \param[in] cookie cookie to be added.
204 * \retval 0 if the cookie is adding succeeds.
205 * \retval negative errno if adding fails.
207 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
208 struct llog_cookie *cookie)
210 struct sub_thandle_cookie *new;
216 INIT_LIST_HEAD(&new->stc_list);
217 new->stc_cookie = *cookie;
218 /* Note: only single thread will access one sub_request each time,
219 * so no need lock here */
220 list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
226 * Insert distribute txn sub req replay
228 * Allocate sub replay req and insert distribute txn replay list.
230 * \param[in] dtrq d to be added
231 * \param[in] cookie the cookie of the update record
232 * \param[in] mdt_index the mdt_index of the update record
234 * \retval 0 if the adding succeeds.
235 * \retval negative errno if the adding fails.
238 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
239 struct llog_cookie *cookie,
242 struct distribute_txn_replay_req_sub *dtrqs = NULL;
243 struct distribute_txn_replay_req_sub *new;
247 spin_lock(&dtrq->dtrq_sub_list_lock);
248 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
249 spin_unlock(&dtrq->dtrq_sub_list_lock);
251 rc = dtrq_sub_add_cookie(dtrqs, cookie);
259 INIT_LIST_HEAD(&new->dtrqs_list);
260 INIT_LIST_HEAD(&new->dtrqs_cookie_list);
261 new->dtrqs_mdt_index = mdt_index;
262 spin_lock(&dtrq->dtrq_sub_list_lock);
263 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
265 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
270 spin_unlock(&dtrq->dtrq_sub_list_lock);
272 rc = dtrq_sub_add_cookie(dtrqs, cookie);
278 * append updates to the current replay updates
280 * Append more updates to the existent replay update. And this is only
281 * used when combining mulitple updates into one large updates during
284 * \param[in] dtrq the update replay request where the new update
285 * records will be added.
286 * \param[in] lur the new update record.
288 * \retval 0 if appending succeeds.
289 * \retval negative errno if appending fails.
291 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
292 struct update_records *record)
294 struct llog_update_record *new_lur;
295 size_t lur_size = dtrq->dtrq_lur_size;
299 /* Because several threads might retrieve the same records from
300 * different targets, and we only need one copy of records. So
301 * we will check if the records is in the next one, if not, just
303 spin_lock(&dtrq->dtrq_sub_list_lock);
304 if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
305 spin_unlock(&dtrq->dtrq_sub_list_lock);
308 dtrq->dtrq_lur->lur_update_rec.ur_index++;
309 spin_unlock(&dtrq->dtrq_sub_list_lock);
311 lur_size += update_records_size(record);
312 OBD_ALLOC_LARGE(new_lur, lur_size);
313 if (new_lur == NULL) {
314 spin_lock(&dtrq->dtrq_sub_list_lock);
315 dtrq->dtrq_lur->lur_update_rec.ur_index--;
316 spin_unlock(&dtrq->dtrq_sub_list_lock);
320 /* Copy the old and new records to the new allocated buffer */
321 memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
322 ptr = (char *)&new_lur->lur_update_rec +
323 update_records_size(&new_lur->lur_update_rec);
324 memcpy(ptr, &record->ur_ops,
325 update_records_size(record) -
326 offsetof(struct update_records, ur_ops));
328 new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
329 new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
330 new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
332 /* Replace the records */
333 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
334 dtrq->dtrq_lur = new_lur;
335 dtrq->dtrq_lur_size = lur_size;
336 dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
337 update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
342 * Insert update records to the replay list.
344 * Allocate distribute txn replay req and insert it into the replay
345 * list, then insert the update records into the replay req.
347 * \param[in] tdtd distribute txn replay data where the replay list
349 * \param[in] record the update record
350 * \param[in] cookie cookie of the record
351 * \param[in] index mdt index of the record
353 * \retval 0 if the adding succeeds.
354 * \retval negative errno if the adding fails.
357 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
358 struct llog_update_record *lur,
359 struct llog_cookie *cookie,
362 struct distribute_txn_replay_req *dtrq;
363 struct update_records *record = &lur->lur_update_rec;
367 CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
368 " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
369 record->ur_batchid, record->ur_master_transno, mdt_index);
372 /* First try to build the replay update request with the records */
373 spin_lock(&tdtd->tdtd_replay_list_lock);
374 dtrq = dtrq_lookup(tdtd, record->ur_batchid);
375 spin_unlock(&tdtd->tdtd_replay_list_lock);
377 /* If the transno in the update record is 0, it means the
378 * update are from master MDT, and we will use the master
379 * last committed transno as its batchid. Note: if it got
380 * the records from the slave later, it needs to update
381 * the batchid by the transno in slave update log (see below) */
382 dtrq = dtrq_create(lur);
384 RETURN(PTR_ERR(dtrq));
386 if (record->ur_master_transno == 0)
387 dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
388 tdtd->tdtd_lut->lut_last_transno;
389 spin_lock(&tdtd->tdtd_replay_list_lock);
390 rc = dtrq_insert(tdtd, dtrq);
391 spin_unlock(&tdtd->tdtd_replay_list_lock);
393 /* Some one else already add the record */
398 struct update_records *dtrq_rec;
400 /* If the master transno in update header is not
401 * matched with the one in the record, then it means
402 * the dtrq is originally created by master record,
403 * and we need update master transno and reposition
404 * the dtrq(by master transno). */
405 dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
406 if (record->ur_master_transno != 0 &&
407 dtrq_rec->ur_master_transno != record->ur_master_transno) {
408 dtrq_rec->ur_master_transno = record->ur_master_transno;
409 spin_lock(&tdtd->tdtd_replay_list_lock);
410 list_del_init(&dtrq->dtrq_list);
411 rc = dtrq_insert(tdtd, dtrq);
412 spin_unlock(&tdtd->tdtd_replay_list_lock);
417 /* This is a partial update records, let's try to append
418 * the record to the current replay request */
419 if (record->ur_flags & UPDATE_RECORD_CONTINUE)
420 rc = dtrq_append_updates(dtrq, record);
423 /* Then create and add sub update request */
424 rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
428 EXPORT_SYMBOL(insert_update_records_to_replay_list);
431 * Dump updates of distribute txns.
433 * Output all of recovery updates in the distribute txn list to the
436 * \param[in] tdtd distribute txn data where all of distribute txn
438 * \param[in] mask debug mask
440 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
442 struct distribute_txn_replay_req *dtrq;
444 spin_lock(&tdtd->tdtd_replay_list_lock);
445 list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
446 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
448 spin_unlock(&tdtd->tdtd_replay_list_lock);
450 EXPORT_SYMBOL(dtrq_list_dump);
453 * Destroy distribute txn replay req
455 * Destroy distribute txn replay req and all of subs.
457 * \param[in] dtrq distribute txn replqy req to be destroyed.
459 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
461 struct distribute_txn_replay_req_sub *dtrqs;
462 struct distribute_txn_replay_req_sub *tmp;
464 LASSERT(list_empty(&dtrq->dtrq_list));
465 spin_lock(&dtrq->dtrq_sub_list_lock);
466 list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
467 struct sub_thandle_cookie *stc;
468 struct sub_thandle_cookie *tmp;
470 list_del(&dtrqs->dtrqs_list);
471 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
473 list_del(&stc->stc_list);
478 spin_unlock(&dtrq->dtrq_sub_list_lock);
480 if (dtrq->dtrq_lur != NULL)
481 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
485 EXPORT_SYMBOL(dtrq_destroy);
488 * Destroy all of replay req.
490 * Destroy all of replay req in the replay list.
492 * \param[in] tdtd target distribute txn data where the replay list is.
494 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
496 struct distribute_txn_replay_req *dtrq;
497 struct distribute_txn_replay_req *tmp;
499 spin_lock(&tdtd->tdtd_replay_list_lock);
500 list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
502 list_del_init(&dtrq->dtrq_list);
505 spin_unlock(&tdtd->tdtd_replay_list_lock);
507 EXPORT_SYMBOL(dtrq_list_destroy);
510 * Get next req in the replay list
512 * Get next req needs to be replayed, since it is a sorted list
513 * (by master MDT transno)
515 * \param[in] tdtd distribute txn data where the replay list is
517 * \retval the pointer of update recovery header
519 struct distribute_txn_replay_req *
520 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
522 struct distribute_txn_replay_req *dtrq = NULL;
524 spin_lock(&tdtd->tdtd_replay_list_lock);
525 if (!list_empty(&tdtd->tdtd_replay_list)) {
526 dtrq = list_entry(tdtd->tdtd_replay_list.next,
527 struct distribute_txn_replay_req, dtrq_list);
528 list_del_init(&dtrq->dtrq_list);
530 spin_unlock(&tdtd->tdtd_replay_list_lock);
534 EXPORT_SYMBOL(distribute_txn_get_next_req);
537 * Get next transno in the replay list, because this is the sorted
538 * list, so it will return the transno of next req in the list.
540 * \param[in] tdtd distribute txn data where the replay list is
542 * \retval the transno of next update in the list
544 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
546 struct distribute_txn_replay_req *dtrq = NULL;
549 spin_lock(&tdtd->tdtd_replay_list_lock);
550 if (!list_empty(&tdtd->tdtd_replay_list)) {
551 dtrq = list_entry(tdtd->tdtd_replay_list.next,
552 struct distribute_txn_replay_req, dtrq_list);
553 transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
555 spin_unlock(&tdtd->tdtd_replay_list_lock);
557 CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
558 tdtd->tdtd_lut->lut_obd->obd_name, transno);
561 EXPORT_SYMBOL(distribute_txn_get_next_transno);
564 * Check if the update of one object is committed
566 * Check whether the update for the object is committed by checking whether
567 * the correspondent sub exists in the replay req. If it is committed, mark
568 * the committed flag in correspondent the sub thandle.
570 * \param[in] env execution environment
571 * \param[in] dtrq replay request
572 * \param[in] dt_obj object for the update
573 * \param[in] top_th top thandle
574 * \param[in] sub_th sub thandle which the update belongs to
576 * \retval 1 if the update is not committed.
577 * \retval 0 if the update is committed.
578 * \retval negative errno if some other failures happen.
580 static int update_is_committed(const struct lu_env *env,
581 struct distribute_txn_replay_req *dtrq,
582 struct dt_object *dt_obj,
583 struct top_thandle *top_th,
584 struct sub_thandle *st)
586 struct seq_server_site *seq_site;
587 const struct lu_fid *fid = lu_object_fid(&dt_obj->do_lu);
588 struct distribute_txn_replay_req_sub *dtrqs;
592 if (st->st_sub_th != NULL)
595 if (st->st_committed)
598 seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
599 if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
600 mdt_index = fid_oid(fid);
601 } else if (!fid_seq_in_fldb(fid_seq(fid))) {
602 mdt_index = seq_site->ss_node_id;
604 struct lu_server_fld *fld;
605 struct lu_seq_range range = {0};
608 fld = seq_site->ss_server_fld;
609 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
610 LASSERT(fld->lsf_seq_lookup != NULL);
611 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
615 mdt_index = range.lsr_index;
618 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
619 if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
620 st->st_committed = 1;
622 struct sub_thandle_cookie *stc;
623 struct sub_thandle_cookie *tmp;
625 list_for_each_entry_safe(stc, tmp,
626 &dtrqs->dtrqs_cookie_list,
628 list_move(&stc->stc_list, &st->st_cookie_list);
633 CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
640 * Implementation of different update methods for update recovery.
642 * These following functions update_recovery_$(update_name) implement
643 * different updates recovery methods. They will extract the parameters
644 * from the common parameters area and call correspondent dt API to redo
647 * \param[in] env execution environment
648 * \param[in] op update operation to be replayed
649 * \param[in] params common update parameters which holds all parameters
651 * \param[in] th transaction handle
652 * \param[in] declare indicate it will do declare or real execution, true
653 * means declare, false means real execution
655 * \retval 0 if it succeeds.
656 * \retval negative errno if it fails.
658 static int update_recovery_create(const struct lu_env *env,
659 struct dt_object *dt_obj,
660 const struct update_op *op,
661 const struct update_params *params,
662 struct thandle_exec_args *ta,
665 struct update_thread_info *uti = update_env_info(env);
666 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
667 struct lu_attr *attr = &uti->uti_attr;
669 struct obdo *lobdo = &uti->uti_obdo;
670 struct dt_object_format dof;
672 unsigned int param_count;
676 if (dt_object_exists(dt_obj))
679 param_count = lur->lur_update_rec.ur_param_count;
680 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
684 if (size != sizeof(*wobdo))
687 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
688 lustre_swab_obdo(wobdo);
690 lustre_get_wire_obdo(NULL, lobdo, wobdo);
691 la_from_obdo(attr, lobdo, lobdo->o_valid);
693 dof.dof_type = dt_mode_to_dft(attr->la_mode);
695 rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
701 static int update_recovery_destroy(const struct lu_env *env,
702 struct dt_object *dt_obj,
703 const struct update_op *op,
704 const struct update_params *params,
705 struct thandle_exec_args *ta,
711 rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
716 static int update_recovery_ref_add(const struct lu_env *env,
717 struct dt_object *dt_obj,
718 const struct update_op *op,
719 const struct update_params *params,
720 struct thandle_exec_args *ta,
726 rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
731 static int update_recovery_ref_del(const struct lu_env *env,
732 struct dt_object *dt_obj,
733 const struct update_op *op,
734 const struct update_params *params,
735 struct thandle_exec_args *ta,
741 rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
746 static int update_recovery_attr_set(const struct lu_env *env,
747 struct dt_object *dt_obj,
748 const struct update_op *op,
749 const struct update_params *params,
750 struct thandle_exec_args *ta,
753 struct update_thread_info *uti = update_env_info(env);
754 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
756 struct obdo *lobdo = &uti->uti_obdo;
757 struct lu_attr *attr = &uti->uti_attr;
759 unsigned int param_count;
763 param_count = lur->lur_update_rec.ur_param_count;
764 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
768 if (size != sizeof(*wobdo))
771 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
772 lustre_swab_obdo(wobdo);
774 lustre_get_wire_obdo(NULL, lobdo, wobdo);
775 la_from_obdo(attr, lobdo, lobdo->o_valid);
777 rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
782 static int update_recovery_xattr_set(const struct lu_env *env,
783 struct dt_object *dt_obj,
784 const struct update_op *op,
785 const struct update_params *params,
786 struct thandle_exec_args *ta,
789 struct update_thread_info *uti = update_env_info(env);
798 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
799 name = update_params_get_param_buf(params,
800 op->uop_params_off[0],
805 buf = update_params_get_param_buf(params,
806 op->uop_params_off[1],
811 uti->uti_buf.lb_buf = buf;
812 uti->uti_buf.lb_len = (size_t)size;
814 buf = update_params_get_param_buf(params, op->uop_params_off[2],
818 if (size != sizeof(fl))
821 fl = le32_to_cpu(*(int *)buf);
823 rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
829 static int update_recovery_index_insert(const struct lu_env *env,
830 struct dt_object *dt_obj,
831 const struct update_op *op,
832 const struct update_params *params,
833 struct thandle_exec_args *ta,
836 struct update_thread_info *uti = update_env_info(env);
846 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
847 name = update_params_get_param_buf(params, op->uop_params_off[0],
852 fid = update_params_get_param_buf(params, op->uop_params_off[1],
856 if (size != sizeof(*fid))
859 fid_le_to_cpu(fid, fid);
861 ptype = update_params_get_param_buf(params, op->uop_params_off[2],
865 if (size != sizeof(*ptype))
867 type = le32_to_cpu(*ptype);
869 if (dt_try_as_dir(env, dt_obj) == 0)
872 uti->uti_rec.rec_fid = fid;
873 uti->uti_rec.rec_type = type;
875 rc = out_tx_index_insert(env, dt_obj,
876 (const struct dt_rec *)&uti->uti_rec,
877 (const struct dt_key *)name, ta, th,
883 static int update_recovery_index_delete(const struct lu_env *env,
884 struct dt_object *dt_obj,
885 const struct update_op *op,
886 const struct update_params *params,
887 struct thandle_exec_args *ta,
890 struct update_thread_info *uti = update_env_info(env);
897 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
898 name = update_params_get_param_buf(params, op->uop_params_off[0],
903 if (dt_try_as_dir(env, dt_obj) == 0)
906 rc = out_tx_index_delete(env, dt_obj,
907 (const struct dt_key *)name, ta, th, NULL, 0);
912 static int update_recovery_write(const struct lu_env *env,
913 struct dt_object *dt_obj,
914 const struct update_op *op,
915 const struct update_params *params,
916 struct thandle_exec_args *ta,
919 struct update_thread_info *uti = update_env_info(env);
927 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
928 buf = update_params_get_param_buf(params, op->uop_params_off[0],
933 uti->uti_buf.lb_buf = buf;
934 uti->uti_buf.lb_len = size;
936 buf = update_params_get_param_buf(params, op->uop_params_off[1],
941 pos = le64_to_cpu(*(__u64 *)buf);
943 rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
949 static int update_recovery_xattr_del(const struct lu_env *env,
950 struct dt_object *dt_obj,
951 const struct update_op *op,
952 const struct update_params *params,
953 struct thandle_exec_args *ta,
956 struct update_thread_info *uti = update_env_info(env);
963 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
964 name = update_params_get_param_buf(params, op->uop_params_off[0],
969 rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
975 * Execute updates in the update replay records
977 * Declare distribute txn replay by update records and add the updates
978 * to the execution list. Note: it will check if the update has been
979 * committed, and only execute the updates if it is not committed to
982 * \param[in] env execution environment
983 * \param[in] tdtd distribute txn replay data which hold all of replay
984 * reqs and all replay parameters.
985 * \param[in] dtrq distribute transaction replay req.
986 * \param[in] ta thandle execute args.
988 * \retval 0 if declare succeeds.
989 * \retval negative errno if declare fails.
991 static int update_recovery_exec(const struct lu_env *env,
992 struct target_distribute_txn_data *tdtd,
993 struct distribute_txn_replay_req *dtrq,
994 struct thandle_exec_args *ta)
996 struct llog_update_record *lur = dtrq->dtrq_lur;
997 struct update_records *records = &lur->lur_update_rec;
998 struct update_ops *ops = &records->ur_ops;
999 struct update_params *params = update_records_get_params(records);
1000 struct top_thandle *top_th = container_of(ta->ta_handle,
1003 struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1004 struct update_op *op;
1009 /* These records have been swabbed in llog_cat_process() */
1010 for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1011 i++, op = update_op_next_op(op)) {
1012 struct lu_fid *fid = &op->uop_fid;
1013 struct dt_object *dt_obj;
1014 struct dt_object *sub_dt_obj;
1015 struct dt_device *sub_dt;
1016 struct sub_thandle *st;
1018 if (op->uop_type == OUT_NOOP)
1021 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1022 if (IS_ERR(dt_obj)) {
1023 rc = PTR_ERR(dt_obj);
1026 sub_dt_obj = dt_object_child(dt_obj);
1028 /* Create sub thandle if not */
1029 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1030 st = lookup_sub_thandle(tmt, sub_dt);
1032 st = create_sub_thandle(tmt, sub_dt);
1034 GOTO(next, rc = PTR_ERR(st));
1037 /* check if updates on the OSD/OSP are committed */
1038 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1040 /* If this is committed, goto next */
1046 /* Create thandle for sub thandle if needed */
1047 if (st->st_sub_th == NULL) {
1048 rc = sub_thandle_trans_create(env, top_th, st);
1053 CDEBUG(D_HA, "replay %uth update\n", i);
1054 switch (op->uop_type) {
1056 rc = update_recovery_create(env, sub_dt_obj,
1061 rc = update_recovery_destroy(env, sub_dt_obj,
1066 rc = update_recovery_ref_add(env, sub_dt_obj,
1071 rc = update_recovery_ref_del(env, sub_dt_obj,
1076 rc = update_recovery_attr_set(env, sub_dt_obj,
1081 rc = update_recovery_xattr_set(env, sub_dt_obj,
1085 case OUT_INDEX_INSERT:
1086 rc = update_recovery_index_insert(env, sub_dt_obj,
1090 case OUT_INDEX_DELETE:
1091 rc = update_recovery_index_delete(env, sub_dt_obj,
1096 rc = update_recovery_write(env, sub_dt_obj,
1101 rc = update_recovery_xattr_del(env, sub_dt_obj,
1106 CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1111 lu_object_put(env, &dt_obj->do_lu);
1116 ta->ta_handle->th_result = rc;
1121 * redo updates on MDT if needed.
1123 * During DNE recovery, the recovery thread (target_recovery_thread) will call
1124 * this function to replay distribute txn updates on all MDTs. It only replay
1125 * updates on the MDT where the update record is missing.
1127 * If the update already exists on the MDT, then it does not need replay the
1128 * updates on that MDT, and only mark the sub transaction has been committed
1131 * \param[in] env execution environment
1132 * \param[in] tdtd target distribute txn data, which holds the replay list
1133 * and all parameters needed by replay process.
1134 * \param[in] dtrq distribute txn replay req.
1136 * \retval 0 if replay succeeds.
1137 * \retval negative errno if replay failes.
1139 int distribute_txn_replay_handle(struct lu_env *env,
1140 struct target_distribute_txn_data *tdtd,
1141 struct distribute_txn_replay_req *dtrq)
1143 struct update_records *records = &dtrq->dtrq_lur->lur_update_rec;
1144 struct thandle_exec_args *ta;
1145 struct lu_context session_env;
1146 struct thandle *th = NULL;
1147 struct top_thandle *top_th;
1148 struct top_multiple_thandle *tmt;
1149 struct thandle_update_records *tur = NULL;
1154 /* initialize session, it is needed for the handler of target */
1155 rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1157 CERROR("%s: failure to initialize session: rc = %d\n",
1158 tdtd->tdtd_lut->lut_obd->obd_name, rc);
1161 lu_context_enter(&session_env);
1162 env->le_ses = &session_env;
1164 update_records_dump(records, D_HA, true);
1165 th = top_trans_create(env, NULL);
1167 GOTO(exit_session, rc = PTR_ERR(th));
1169 ta = &update_env_info(env)->uti_tea;
1172 update_env_info(env)->uti_dtrq = dtrq;
1173 /* Create distribute transaction structure for this top thandle */
1174 top_th = container_of(th, struct top_thandle, tt_super);
1175 rc = top_trans_create_tmt(env, top_th);
1177 GOTO(stop_trans, rc);
1181 /* check if the distribute transaction has been committed */
1182 tmt = top_th->tt_multiple_thandle;
1183 tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1184 tmt->tmt_batchid = records->ur_batchid;
1185 tgt_th_info(env)->tti_transno = records->ur_master_transno;
1187 if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1188 tmt->tmt_committed = 1;
1190 rc = update_recovery_exec(env, tdtd, dtrq, ta);
1192 GOTO(stop_trans, rc);
1194 /* If no updates are needed to be replayed, then
1195 * mark this records as committed, so commit thread
1196 * distribute_txn_commit_thread() will delete the
1198 if (ta->ta_argno == 0)
1199 tmt->tmt_committed = 1;
1201 tur = &update_env_info(env)->uti_tur;
1202 tur->tur_update_records = dtrq->dtrq_lur;
1203 tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1204 tur->tur_update_params = NULL;
1205 tur->tur_update_param_count = 0;
1206 tmt->tmt_update_records = tur;
1208 distribute_txn_insert_by_batchid(tmt);
1209 rc = top_trans_start(env, NULL, th);
1211 GOTO(stop_trans, rc);
1213 for (i = 0; i < ta->ta_argno; i++) {
1214 struct tx_arg *ta_arg;
1215 struct dt_object *dt_obj;
1216 struct dt_device *sub_dt;
1217 struct sub_thandle *st;
1219 ta_arg = ta->ta_args[i];
1220 dt_obj = ta_arg->object;
1222 LASSERT(tmt->tmt_committed == 0);
1223 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1224 st = lookup_sub_thandle(tmt, sub_dt);
1225 LASSERT(st != NULL);
1226 LASSERT(st->st_sub_th != NULL);
1227 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1229 if (unlikely(rc < 0)) {
1230 CDEBUG(D_HA, "error during execution of #%u from"
1231 " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1232 ta->ta_args[i]->line, rc);
1234 if (ta->ta_args[i]->undo_fn != NULL) {
1235 dt_obj = ta->ta_args[i]->object;
1237 lu2dt_dev(dt_obj->do_lu.lo_dev);
1238 st = lookup_sub_thandle(tmt, sub_dt);
1239 LASSERT(st != NULL);
1240 LASSERT(st->st_sub_th != NULL);
1242 ta->ta_args[i]->undo_fn(env,
1246 CERROR("%s: undo for %s:%d: rc = %d\n",
1247 dt_obd_name(ta->ta_handle->th_dev),
1248 ta->ta_args[i]->file,
1249 ta->ta_args[i]->line, -ENOTSUPP);
1254 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1255 dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1261 rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1262 for (i = 0; i < ta->ta_argno; i++) {
1263 if (ta->ta_args[i]->object != NULL) {
1264 lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1265 ta->ta_args[i]->object = NULL;
1270 tur->tur_update_records = NULL;
1272 lu_context_exit(&session_env);
1273 lu_context_fini(&session_env);
1276 EXPORT_SYMBOL(distribute_txn_replay_handle);