1 // SPDX-License-Identifier: GPL-2.0
4 * Copyright (c) 2015, 2017, Intel Corporation.
8 * This file implement the methods to handle the update recovery.
10 * During DNE recovery, the recovery thread will redo the operation according
11 * to the transaction no, and these replay are either from client replay req
12 * or update replay records(for distribute transaction) in the update log.
13 * For distribute transaction replay, the replay thread will call
14 * distribute_txn_replay_handle() to handle the updates.
16 * After the Master MDT restarts, it will retrieve the update records from all
17 * of MDTs, for each distributed operation, it will check updates on all MDTs,
18 * if some updates records are missing on some MDTs, the replay thread will redo
19 * updates on these MDTs.
21 * Author: Di Wang <di.wang@intel.com>
24 #define DEBUG_SUBSYSTEM S_CLASS
26 #include <lu_target.h>
27 #include <lustre_obdo.h>
28 #include <lustre_update.h>
29 #include <lustre_swab.h>
30 #include <md_object.h>
32 #include <obd_class.h>
34 #include "tgt_internal.h"
37 * Lookup distribute_txn_replay req
39 * Lookup distribute_txn_replay in the replay list by batchid.
40 * It is assumed the list has been locked before calling this function.
42 * \param[in] tdtd distribute_txn_data, which holds the replay
44 * \param[in] batchid batchid used by lookup.
46 * \retval pointer of the replay if succeeds.
47 * \retval NULL if can not find it.
49 static struct distribute_txn_replay_req *
50 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
52 struct distribute_txn_replay_req *tmp;
53 struct distribute_txn_replay_req *dtrq = NULL;
55 list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
56 if (tmp->dtrq_batchid == batchid) {
65 * insert distribute txn replay req
67 * Insert distribute txn replay to the replay list, and it assumes the
68 * list has been looked. Note: the replay list is a sorted list, which
69 * is sorted by master transno. It is assumed the replay list has been
70 * locked before calling this function.
72 * \param[in] tdtd target distribute txn data where replay list is
73 * \param[in] new distribute txn replay to be inserted
75 * \retval 0 if insertion succeeds
76 * \retval EEXIST if the dtrq already exists
78 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
79 struct distribute_txn_replay_req *new)
81 struct distribute_txn_replay_req *iter;
83 /* Check if the dtrq has been added to the list */
84 iter = dtrq_lookup(tdtd, new->dtrq_batchid);
88 list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
89 if (iter->dtrq_master_transno > new->dtrq_master_transno)
92 /* If there are mulitple replay req with same transno, then
93 * sort them with batchid */
94 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
95 iter->dtrq_batchid > new->dtrq_batchid)
98 list_add(&new->dtrq_list, &iter->dtrq_list);
102 if (list_empty(&new->dtrq_list))
103 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
109 * create distribute txn replay req
111 * Allocate distribute txn replay req according to the update records.
113 * \param[in] tdtd target distribute txn data where replay list is.
114 * \param[in] record update records from the update log.
116 * \retval the pointer of distribute txn replay req if
117 * the creation succeeds.
118 * \retval NULL if the creation fails.
120 static struct distribute_txn_replay_req *
121 dtrq_create(struct target_distribute_txn_data *tdtd,
122 struct llog_update_record *lur)
124 struct distribute_txn_replay_req *new;
128 RETURN(ERR_PTR(-ENOMEM));
130 new->dtrq_lur_size = llog_update_record_size(lur);
131 OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
132 if (new->dtrq_lur == NULL) {
134 RETURN(ERR_PTR(-ENOMEM));
137 memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
139 /* If the transno in the update record is 0, it means the
140 * update are from master MDT, and it will use the master
141 * last committed transno as its master transno. Later, if
142 * the update records are gotten from slave MDTs, then these
143 * transno will be replaced.
144 * See insert_update_records_to_replay_list(). */
145 if (lur->lur_update_rec.ur_master_transno == 0) {
146 new->dtrq_lur->lur_update_rec.ur_master_transno =
147 tdtd->tdtd_lut->lut_obd->obd_last_committed;
148 new->dtrq_master_transno =
149 tdtd->tdtd_lut->lut_obd->obd_last_committed;
151 new->dtrq_master_transno =
152 lur->lur_update_rec.ur_master_transno;
155 new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
157 spin_lock_init(&new->dtrq_sub_list_lock);
158 INIT_LIST_HEAD(&new->dtrq_sub_list);
159 INIT_LIST_HEAD(&new->dtrq_list);
165 * Lookup distribute sub replay
167 * Lookup distribute sub replay in the sub list of distribute_txn_replay by
170 * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
171 * \param[in] mdt_index the mdt_index as the key of lookup
173 * \retval the pointer of sub replay if it can be found.
174 * \retval NULL if it can not find.
176 struct distribute_txn_replay_req_sub *
177 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
179 struct distribute_txn_replay_req_sub *dtrqs = NULL;
180 struct distribute_txn_replay_req_sub *tmp;
182 list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
183 if (tmp->dtrqs_mdt_index == mdt_index) {
192 * Try to add cookie to sub distribute txn request
194 * Check if the update log cookie has been added to the request, if not,
195 * add it to the dtrqs_cookie_list.
197 * \param[in] dtrqs sub replay req where cookies to be added.
198 * \param[in] cookie cookie to be added.
200 * \retval 0 if the cookie is adding succeeds.
201 * \retval negative errno if adding fails.
203 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
204 struct llog_cookie *cookie)
206 struct sub_thandle_cookie *new;
212 INIT_LIST_HEAD(&new->stc_list);
213 new->stc_cookie = *cookie;
214 /* Note: only single thread will access one sub_request each time,
215 * so no need lock here */
216 list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
222 * Insert distribute txn sub req replay
224 * Allocate sub replay req and insert distribute txn replay list.
226 * \param[in] dtrq d to be added
227 * \param[in] cookie the cookie of the update record
228 * \param[in] mdt_index the mdt_index of the update record
230 * \retval 0 if the adding succeeds.
231 * \retval negative errno if the adding fails.
234 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
235 struct llog_cookie *cookie,
238 struct distribute_txn_replay_req_sub *dtrqs = NULL;
239 struct distribute_txn_replay_req_sub *new;
243 spin_lock(&dtrq->dtrq_sub_list_lock);
244 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
245 spin_unlock(&dtrq->dtrq_sub_list_lock);
247 rc = dtrq_sub_add_cookie(dtrqs, cookie);
255 INIT_LIST_HEAD(&new->dtrqs_list);
256 INIT_LIST_HEAD(&new->dtrqs_cookie_list);
257 new->dtrqs_mdt_index = mdt_index;
258 spin_lock(&dtrq->dtrq_sub_list_lock);
259 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
261 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
266 spin_unlock(&dtrq->dtrq_sub_list_lock);
268 rc = dtrq_sub_add_cookie(dtrqs, cookie);
274 * append updates to the current replay updates
276 * Append more updates to the existent replay update. And this is only
277 * used when combining mulitple updates into one large updates during
280 * \param[in] dtrq the update replay request where the new update
281 * records will be added.
282 * \param[in] lur the new update record.
284 * \retval 0 if appending succeeds.
285 * \retval negative errno if appending fails.
287 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
288 struct update_records *record)
290 struct llog_update_record *new_lur;
291 size_t lur_size = dtrq->dtrq_lur_size;
295 /* Because several threads might retrieve the same records from
296 * different targets, and we only need one copy of records. So
297 * we will check if the records is in the next one, if not, just
299 spin_lock(&dtrq->dtrq_sub_list_lock);
300 if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
301 spin_unlock(&dtrq->dtrq_sub_list_lock);
304 dtrq->dtrq_lur->lur_update_rec.ur_index++;
305 spin_unlock(&dtrq->dtrq_sub_list_lock);
307 lur_size += update_records_size(record);
308 OBD_ALLOC_LARGE(new_lur, lur_size);
309 if (new_lur == NULL) {
310 spin_lock(&dtrq->dtrq_sub_list_lock);
311 dtrq->dtrq_lur->lur_update_rec.ur_index--;
312 spin_unlock(&dtrq->dtrq_sub_list_lock);
316 /* Copy the old and new records to the new allocated buffer */
317 memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
318 ptr = (char *)&new_lur->lur_update_rec +
319 update_records_size(&new_lur->lur_update_rec);
320 memcpy(ptr, &record->ur_ops,
321 update_records_size(record) -
322 offsetof(struct update_records, ur_ops));
324 new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
325 new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
326 new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
328 /* Replace the records */
329 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
330 dtrq->dtrq_lur = new_lur;
331 dtrq->dtrq_lur_size = lur_size;
332 dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
333 update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
338 * Insert update records to the replay list.
340 * Allocate distribute txn replay req and insert it into the replay
341 * list, then insert the update records into the replay req.
343 * \param[in] tdtd distribute txn replay data where the replay list
345 * \param[in] record the update record
346 * \param[in] cookie cookie of the record
347 * \param[in] index mdt index of the record
349 * \retval 0 if the adding succeeds.
350 * \retval negative errno if the adding fails.
353 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
354 struct llog_update_record *lur,
355 struct llog_cookie *cookie,
358 struct distribute_txn_replay_req *dtrq;
359 struct update_records *record = &lur->lur_update_rec;
360 bool replace_record = false;
364 CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
365 " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
366 record->ur_batchid, record->ur_master_transno, mdt_index);
368 /* Update batchid if necessary */
369 spin_lock(&tdtd->tdtd_batchid_lock);
370 if (record->ur_batchid >= tdtd->tdtd_batchid) {
371 CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
372 tdtd->tdtd_lut->lut_obd->obd_name,
373 tdtd->tdtd_batchid, record->ur_batchid);
374 tdtd->tdtd_batchid = record->ur_batchid + 1;
376 spin_unlock(&tdtd->tdtd_batchid_lock);
379 spin_lock(&tdtd->tdtd_replay_list_lock);
380 /* First try to build the replay update request with the records */
381 dtrq = dtrq_lookup(tdtd, record->ur_batchid);
383 spin_unlock(&tdtd->tdtd_replay_list_lock);
384 dtrq = dtrq_create(tdtd, lur);
386 RETURN(PTR_ERR(dtrq));
388 spin_lock(&tdtd->tdtd_replay_list_lock);
389 rc = dtrq_insert(tdtd, dtrq);
391 spin_unlock(&tdtd->tdtd_replay_list_lock);
398 /* If the master transno in update header is not
399 * matched with the one in the record, then it means
400 * the dtrq is originally created by master record,
401 * so we need update master transno and reposition
402 * the dtrq(by master transno) in the list and also
403 * replace update record */
404 if (record->ur_master_transno != 0 &&
405 dtrq->dtrq_master_transno != record->ur_master_transno &&
406 dtrq->dtrq_lur != NULL) {
407 list_del_init(&dtrq->dtrq_list);
408 dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
409 record->ur_master_transno;
411 dtrq->dtrq_master_transno = record->ur_master_transno;
412 replace_record = true;
413 /* try to insert again */
414 rc = dtrq_insert(tdtd, dtrq);
416 spin_unlock(&tdtd->tdtd_replay_list_lock);
422 spin_unlock(&tdtd->tdtd_replay_list_lock);
424 /* Because there should be only thread access the update record, so
425 * we do not need lock here */
426 if (replace_record) {
427 /* Replace the update record and master transno */
428 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
429 dtrq->dtrq_lur = NULL;
430 dtrq->dtrq_lur_size = llog_update_record_size(lur);
431 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
432 if (dtrq->dtrq_lur == NULL)
435 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
438 /* This is a partial update records, let's try to append
439 * the record to the current replay request */
440 if (record->ur_flags & UPDATE_RECORD_CONTINUE)
441 rc = dtrq_append_updates(dtrq, record);
443 /* Then create and add sub update request */
444 rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
448 EXPORT_SYMBOL(insert_update_records_to_replay_list);
451 * Dump updates of distribute txns.
453 * Output all of recovery updates in the distribute txn list to the
456 * \param[in] tdtd distribute txn data where all of distribute txn
458 * \param[in] mask debug mask
460 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
462 struct distribute_txn_replay_req *dtrq;
464 spin_lock(&tdtd->tdtd_replay_list_lock);
465 list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
466 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
468 spin_unlock(&tdtd->tdtd_replay_list_lock);
470 EXPORT_SYMBOL(dtrq_list_dump);
473 * Destroy distribute txn replay req
475 * Destroy distribute txn replay req and all of subs.
477 * \param[in] dtrq distribute txn replqy req to be destroyed.
479 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
481 struct distribute_txn_replay_req_sub *dtrqs;
482 struct distribute_txn_replay_req_sub *tmp;
484 LASSERT(list_empty(&dtrq->dtrq_list));
485 CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
486 dtrq->dtrq_master_transno);
487 spin_lock(&dtrq->dtrq_sub_list_lock);
488 list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
489 struct sub_thandle_cookie *stc;
490 struct sub_thandle_cookie *tmp;
492 list_del(&dtrqs->dtrqs_list);
493 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
495 list_del(&stc->stc_list);
500 spin_unlock(&dtrq->dtrq_sub_list_lock);
502 if (dtrq->dtrq_lur != NULL)
503 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
507 EXPORT_SYMBOL(dtrq_destroy);
510 * Destroy all of replay req.
512 * Destroy all of replay req in the replay list.
514 * \param[in] tdtd target distribute txn data where the replay list is.
516 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
518 struct distribute_txn_replay_req *dtrq;
519 struct distribute_txn_replay_req *tmp;
521 spin_lock(&tdtd->tdtd_replay_list_lock);
522 list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
524 list_del_init(&dtrq->dtrq_list);
527 list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
529 list_del_init(&dtrq->dtrq_list);
532 spin_unlock(&tdtd->tdtd_replay_list_lock);
534 EXPORT_SYMBOL(dtrq_list_destroy);
537 * Get next req in the replay list
539 * Get next req needs to be replayed, since it is a sorted list
540 * (by master MDT transno)
542 * \param[in] tdtd distribute txn data where the replay list is
544 * \retval the pointer of update recovery header
546 struct distribute_txn_replay_req *
547 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
549 struct distribute_txn_replay_req *dtrq = NULL;
551 spin_lock(&tdtd->tdtd_replay_list_lock);
552 if (!list_empty(&tdtd->tdtd_replay_list)) {
553 dtrq = list_first_entry(&tdtd->tdtd_replay_list,
554 struct distribute_txn_replay_req,
556 list_del_init(&dtrq->dtrq_list);
558 spin_unlock(&tdtd->tdtd_replay_list_lock);
562 EXPORT_SYMBOL(distribute_txn_get_next_req);
565 * Get next transno in the replay list, because this is the sorted
566 * list, so it will return the transno of next req in the list.
568 * \param[in] tdtd distribute txn data where the replay list is
570 * \retval the transno of next update in the list
572 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
574 struct distribute_txn_replay_req *dtrq = NULL;
577 spin_lock(&tdtd->tdtd_replay_list_lock);
578 if (!list_empty(&tdtd->tdtd_replay_list)) {
579 dtrq = list_first_entry(&tdtd->tdtd_replay_list,
580 struct distribute_txn_replay_req,
582 transno = dtrq->dtrq_master_transno;
584 spin_unlock(&tdtd->tdtd_replay_list_lock);
586 CDEBUG(D_HA, "%s: Next update transno %llu\n",
587 tdtd->tdtd_lut->lut_obd->obd_name, transno);
590 EXPORT_SYMBOL(distribute_txn_get_next_transno);
592 struct distribute_txn_replay_req *
593 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
596 struct distribute_txn_replay_req *dtrq = NULL;
597 struct distribute_txn_replay_req *iter;
599 spin_lock(&tdtd->tdtd_replay_list_lock);
600 list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
601 if (iter->dtrq_master_transno == transno) {
606 spin_unlock(&tdtd->tdtd_replay_list_lock);
610 bool is_req_replayed_by_update(struct ptlrpc_request *req)
612 struct lu_target *tgt = class_exp2tgt(req->rq_export);
613 struct distribute_txn_replay_req *dtrq;
615 if (tgt->lut_tdtd == NULL)
618 dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
619 lustre_msg_get_transno(req->rq_reqmsg));
625 EXPORT_SYMBOL(is_req_replayed_by_update);
628 * Check if the update of one object is committed
630 * Check whether the update for the object is committed by checking whether
631 * the correspondent sub exists in the replay req. If it is committed, mark
632 * the committed flag in correspondent the sub thandle.
634 * \param[in] env execution environment
635 * \param[in] dtrq replay request
636 * \param[in] dt_obj object for the update
637 * \param[in] top_th top thandle
638 * \param[in] sub_th sub thandle which the update belongs to
640 * \retval 1 if the update is not committed.
641 * \retval 0 if the update is committed.
642 * \retval negative errno if some other failures happen.
644 static int update_is_committed(const struct lu_env *env,
645 struct distribute_txn_replay_req *dtrq,
646 struct dt_object *dt_obj,
647 struct top_thandle *top_th,
648 struct sub_thandle *st)
650 struct seq_server_site *seq_site;
651 const struct lu_fid *fid = lu_object_fid(&dt_obj->do_lu);
652 struct distribute_txn_replay_req_sub *dtrqs;
656 if (st->st_sub_th != NULL)
659 if (st->st_committed)
662 seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
663 if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
664 mdt_index = fid_oid(fid);
665 } else if (!fid_seq_in_fldb(fid_seq(fid))) {
666 mdt_index = seq_site->ss_node_id;
668 struct lu_server_fld *fld;
669 struct lu_seq_range range = {0};
672 fld = seq_site->ss_server_fld;
673 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
674 LASSERT(fld->lsf_seq_lookup != NULL);
675 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
679 mdt_index = range.lsr_index;
682 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
683 if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
684 st->st_committed = 1;
686 struct sub_thandle_cookie *stc;
687 struct sub_thandle_cookie *tmp;
689 list_for_each_entry_safe(stc, tmp,
690 &dtrqs->dtrqs_cookie_list,
692 list_move(&stc->stc_list, &st->st_cookie_list);
697 CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
704 * Implementation of different update methods for update recovery.
706 * These following functions update_recovery_$(update_name) implement
707 * different updates recovery methods. They will extract the parameters
708 * from the common parameters area and call correspondent dt API to redo
711 * \param[in] env execution environment
712 * \param[in] op update operation to be replayed
713 * \param[in] params common update parameters which holds all parameters
715 * \param[in] th transaction handle
716 * \param[in] declare indicate it will do declare or real execution, true
717 * means declare, false means real execution
719 * \retval 0 if it succeeds.
720 * \retval negative errno if it fails.
722 static int update_recovery_create(const struct lu_env *env,
723 struct dt_object *dt_obj,
724 const struct update_op *op,
725 const struct update_params *params,
726 struct thandle_exec_args *ta,
729 struct update_thread_info *uti = update_env_info(env);
730 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
731 struct lu_attr *attr = &uti->uti_attr;
733 struct obdo *lobdo = &uti->uti_obdo;
734 struct dt_object_format dof;
736 unsigned int param_count;
740 if (dt_object_exists(dt_obj))
743 param_count = lur->lur_update_rec.ur_param_count;
744 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
748 if (size != sizeof(*wobdo))
751 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
752 lustre_swab_obdo(wobdo);
754 lustre_get_wire_obdo(NULL, lobdo, wobdo);
755 la_from_obdo(attr, lobdo, lobdo->o_valid);
757 dof.dof_type = dt_mode_to_dft(attr->la_mode);
759 rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
765 static int update_recovery_destroy(const struct lu_env *env,
766 struct dt_object *dt_obj,
767 const struct update_op *op,
768 const struct update_params *params,
769 struct thandle_exec_args *ta,
775 rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
780 static int update_recovery_ref_add(const struct lu_env *env,
781 struct dt_object *dt_obj,
782 const struct update_op *op,
783 const struct update_params *params,
784 struct thandle_exec_args *ta,
790 rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
795 static int update_recovery_ref_del(const struct lu_env *env,
796 struct dt_object *dt_obj,
797 const struct update_op *op,
798 const struct update_params *params,
799 struct thandle_exec_args *ta,
805 rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
810 static int update_recovery_attr_set(const struct lu_env *env,
811 struct dt_object *dt_obj,
812 const struct update_op *op,
813 const struct update_params *params,
814 struct thandle_exec_args *ta,
817 struct update_thread_info *uti = update_env_info(env);
818 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
820 struct obdo *lobdo = &uti->uti_obdo;
821 struct lu_attr *attr = &uti->uti_attr;
823 unsigned int param_count;
827 param_count = lur->lur_update_rec.ur_param_count;
828 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
832 if (size != sizeof(*wobdo))
835 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
836 lustre_swab_obdo(wobdo);
838 lustre_get_wire_obdo(NULL, lobdo, wobdo);
839 la_from_obdo(attr, lobdo, lobdo->o_valid);
841 rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
846 static int update_recovery_xattr_set(const struct lu_env *env,
847 struct dt_object *dt_obj,
848 const struct update_op *op,
849 const struct update_params *params,
850 struct thandle_exec_args *ta,
853 struct update_thread_info *uti = update_env_info(env);
862 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
863 name = update_params_get_param_buf(params,
864 op->uop_params_off[0],
869 buf = update_params_get_param_buf(params,
870 op->uop_params_off[1],
875 uti->uti_buf.lb_buf = buf;
876 uti->uti_buf.lb_len = (size_t)size;
878 buf = update_params_get_param_buf(params, op->uop_params_off[2],
882 if (size != sizeof(fl))
885 fl = le32_to_cpu(*(int *)buf);
887 rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
893 static int update_recovery_index_insert(const struct lu_env *env,
894 struct dt_object *dt_obj,
895 const struct update_op *op,
896 const struct update_params *params,
897 struct thandle_exec_args *ta,
900 struct update_thread_info *uti = update_env_info(env);
910 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
911 name = update_params_get_param_buf(params, op->uop_params_off[0],
916 fid = update_params_get_param_buf(params, op->uop_params_off[1],
920 if (size != sizeof(*fid))
923 fid_le_to_cpu(fid, fid);
925 ptype = update_params_get_param_buf(params, op->uop_params_off[2],
929 if (size != sizeof(*ptype))
931 type = le32_to_cpu(*ptype);
933 if (!dt_try_as_dir(env, dt_obj, false))
936 uti->uti_rec.rec_fid = fid;
937 uti->uti_rec.rec_type = type;
939 rc = out_tx_index_insert(env, dt_obj,
940 (const struct dt_rec *)&uti->uti_rec,
941 (const struct dt_key *)name, ta, th,
947 static int update_recovery_index_delete(const struct lu_env *env,
948 struct dt_object *dt_obj,
949 const struct update_op *op,
950 const struct update_params *params,
951 struct thandle_exec_args *ta,
954 struct update_thread_info *uti = update_env_info(env);
961 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
962 name = update_params_get_param_buf(params, op->uop_params_off[0],
967 if (!dt_try_as_dir(env, dt_obj, true))
970 rc = out_tx_index_delete(env, dt_obj,
971 (const struct dt_key *)name, ta, th, NULL, 0);
976 static int update_recovery_write(const struct lu_env *env,
977 struct dt_object *dt_obj,
978 const struct update_op *op,
979 const struct update_params *params,
980 struct thandle_exec_args *ta,
983 struct update_thread_info *uti = update_env_info(env);
991 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
992 buf = update_params_get_param_buf(params, op->uop_params_off[0],
997 uti->uti_buf.lb_buf = buf;
998 uti->uti_buf.lb_len = size;
1000 buf = update_params_get_param_buf(params, op->uop_params_off[1],
1001 param_count, &size);
1005 pos = le64_to_cpu(*(__u64 *)buf);
1007 rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1013 static int update_recovery_xattr_del(const struct lu_env *env,
1014 struct dt_object *dt_obj,
1015 const struct update_op *op,
1016 const struct update_params *params,
1017 struct thandle_exec_args *ta,
1020 struct update_thread_info *uti = update_env_info(env);
1027 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1028 name = update_params_get_param_buf(params, op->uop_params_off[0],
1029 param_count, &size);
1033 rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1039 * Update session information
1041 * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1042 * can be called correctly during update replay.
1044 * \param[in] env execution environment.
1045 * \param[in] tdtd distribute data structure of the recovering tgt.
1046 * \param[in] th thandle of this update replay.
1047 * \param[in] master_th master sub thandle.
1048 * \param[in] ta_arg the tx arg structure to hold the update for updating
1051 static void update_recovery_update_ses(struct lu_env *env,
1052 struct target_distribute_txn_data *tdtd,
1054 struct thandle *master_th,
1055 struct distribute_txn_replay_req *dtrq,
1056 struct tx_arg *ta_arg)
1058 struct tgt_session_info *tsi;
1059 struct lu_target *lut = tdtd->tdtd_lut;
1060 struct lsd_reply_header *lrh = &lut->lut_reply_header;
1061 struct lsd_reply_data *lrd;
1062 struct top_thandle *top_th;
1063 struct obd_export *export;
1064 struct cfs_hash *hash;
1067 tsi = tgt_ses_info(env);
1068 if (tsi->tsi_exp != NULL)
1071 size = ta_arg->u.write.buf.lb_len;
1072 lrd = ta_arg->u.write.buf.lb_buf;
1073 if (size != lrh->lrh_reply_size || lrd == NULL)
1076 lrd->lrd_transno = le64_to_cpu(lrd->lrd_transno);
1077 lrd->lrd_xid = le64_to_cpu(lrd->lrd_xid);
1078 lrd->lrd_data = le64_to_cpu(lrd->lrd_data);
1079 lrd->lrd_result = le32_to_cpu(lrd->lrd_result);
1080 lrd->lrd_client_gen = le32_to_cpu(lrd->lrd_client_gen);
1082 CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
1083 if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1086 hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1090 export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1091 if (export == NULL) {
1092 cfs_hash_putref(hash);
1096 tsi->tsi_exp = export;
1097 tsi->tsi_xid = lrd->lrd_xid;
1098 tsi->tsi_opdata = lrd->lrd_data;
1099 tsi->tsi_result = lrd->lrd_result;
1100 tsi->tsi_client_gen = lrd->lrd_client_gen;
1101 dtrq->dtrq_xid = lrd->lrd_xid;
1102 top_th = container_of(th, struct top_thandle, tt_super);
1103 top_th->tt_master_sub_thandle = master_th;
1104 cfs_hash_putref(hash);
1108 * Execute updates in the update replay records
1110 * Declare distribute txn replay by update records and add the updates
1111 * to the execution list. Note: it will check if the update has been
1112 * committed, and only execute the updates if it is not committed to
1115 * \param[in] env execution environment
1116 * \param[in] tdtd distribute txn replay data which hold all of replay
1117 * reqs and all replay parameters.
1118 * \param[in] dtrq distribute transaction replay req.
1119 * \param[in] ta thandle execute args.
1121 * \retval 0 if declare succeeds.
1122 * \retval negative errno if declare fails.
1124 static int update_recovery_exec(const struct lu_env *env,
1125 struct target_distribute_txn_data *tdtd,
1126 struct distribute_txn_replay_req *dtrq,
1127 struct thandle_exec_args *ta)
1129 struct llog_update_record *lur = dtrq->dtrq_lur;
1130 struct update_records *records = &lur->lur_update_rec;
1131 struct update_ops *ops = &records->ur_ops;
1132 struct update_params *params = update_records_get_params(records);
1133 struct top_thandle *top_th = container_of(ta->ta_handle,
1136 struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1137 struct update_op *op;
1142 /* These records have been swabbed in llog_cat_process() */
1143 for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1144 i++, op = update_op_next_op(op)) {
1145 struct lu_fid *fid = &op->uop_fid;
1146 struct dt_object *dt_obj;
1147 struct dt_object *sub_dt_obj;
1148 struct dt_device *sub_dt;
1149 struct sub_thandle *st;
1151 if (op->uop_type == OUT_NOOP)
1154 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1155 if (IS_ERR(dt_obj)) {
1156 rc = PTR_ERR(dt_obj);
1158 LCONSOLE_WARN("%.16s: hit invalid OI mapping "
1159 "for "DFID" during recovering, "
1160 "that may because auto scrub is "
1161 "disabled on related MDT, and "
1162 "will cause recovery failure. "
1163 "Please enable auto scrub and "
1164 "retry the recovery.\n",
1165 tdtd->tdtd_lut->lut_obd->obd_name,
1170 sub_dt_obj = dt_object_child(dt_obj);
1172 /* Create sub thandle if not */
1173 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1174 st = lookup_sub_thandle(tmt, sub_dt);
1176 st = create_sub_thandle(tmt, sub_dt);
1178 GOTO(next, rc = PTR_ERR(st));
1181 /* check if updates on the OSD/OSP are committed */
1182 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1184 /* If this is committed, goto next */
1190 /* Create thandle for sub thandle if needed */
1191 if (st->st_sub_th == NULL) {
1192 rc = sub_thandle_trans_create(env, top_th, st);
1197 CDEBUG(D_HA, "replay %uth update\n", i);
1198 switch (op->uop_type) {
1200 rc = update_recovery_create(env, sub_dt_obj,
1205 rc = update_recovery_destroy(env, sub_dt_obj,
1210 rc = update_recovery_ref_add(env, sub_dt_obj,
1215 rc = update_recovery_ref_del(env, sub_dt_obj,
1220 rc = update_recovery_attr_set(env, sub_dt_obj,
1225 rc = update_recovery_xattr_set(env, sub_dt_obj,
1229 case OUT_INDEX_INSERT:
1230 rc = update_recovery_index_insert(env, sub_dt_obj,
1234 case OUT_INDEX_DELETE:
1235 rc = update_recovery_index_delete(env, sub_dt_obj,
1240 rc = update_recovery_write(env, sub_dt_obj,
1245 rc = update_recovery_xattr_del(env, sub_dt_obj,
1250 CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1255 dt_object_put(env, dt_obj);
1260 ta->ta_handle->th_result = rc;
1265 * redo updates on MDT if needed.
1267 * During DNE recovery, the recovery thread (target_recovery_thread) will call
1268 * this function to replay distribute txn updates on all MDTs. It only replay
1269 * updates on the MDT where the update record is missing.
1271 * If the update already exists on the MDT, then it does not need replay the
1272 * updates on that MDT, and only mark the sub transaction has been committed
1275 * \param[in] env execution environment
1276 * \param[in] tdtd target distribute txn data, which holds the replay list
1277 * and all parameters needed by replay process.
1278 * \param[in] dtrq distribute txn replay req.
1280 * \retval 0 if replay succeeds.
1281 * \retval negative errno if replay failes.
1283 int distribute_txn_replay_handle(struct lu_env *env,
1284 struct target_distribute_txn_data *tdtd,
1285 struct distribute_txn_replay_req *dtrq)
1287 struct update_records *records = &dtrq->dtrq_lur->lur_update_rec;
1288 struct thandle_exec_args *ta;
1289 struct lu_context session_env;
1290 struct thandle *th = NULL;
1291 struct top_thandle *top_th;
1292 struct top_multiple_thandle *tmt;
1293 struct thandle_update_records *tur = NULL;
1298 /* initialize session, it is needed for the handler of target */
1299 rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1301 CERROR("%s: failure to initialize session: rc = %d\n",
1302 tdtd->tdtd_lut->lut_obd->obd_name, rc);
1305 lu_context_enter(&session_env);
1306 env->le_ses = &session_env;
1308 update_records_dump(records, D_HA, true);
1309 th = top_trans_create(env, NULL);
1311 GOTO(exit_session, rc = PTR_ERR(th));
1313 ta = &update_env_info(env)->uti_tea;
1316 update_env_info(env)->uti_dtrq = dtrq;
1317 /* Create distribute transaction structure for this top thandle */
1318 top_th = container_of(th, struct top_thandle, tt_super);
1319 rc = top_trans_create_tmt(env, top_th);
1321 GOTO(stop_trans, rc);
1323 th->th_dev = tdtd->tdtd_dt;
1326 /* check if the distribute transaction has been committed */
1327 tmt = top_th->tt_multiple_thandle;
1328 tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1329 tmt->tmt_batchid = dtrq->dtrq_batchid;
1330 tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1332 if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1333 tmt->tmt_committed = 1;
1335 rc = update_recovery_exec(env, tdtd, dtrq, ta);
1337 GOTO(stop_trans, rc);
1339 /* If no updates are needed to be replayed, then mark this records as
1340 * committed, so commit thread distribute_txn_commit_thread() will
1341 * delete the record */
1342 if (ta->ta_argno == 0)
1343 tmt->tmt_committed = 1;
1345 tur = &update_env_info(env)->uti_tur;
1346 tur->tur_update_records = dtrq->dtrq_lur;
1347 tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1348 tur->tur_update_params = NULL;
1349 tur->tur_update_param_count = 0;
1350 tmt->tmt_update_records = tur;
1352 distribute_txn_insert_by_batchid(tmt);
1353 rc = top_trans_start(env, NULL, th);
1355 GOTO(stop_trans, rc);
1357 for (i = 0; i < ta->ta_argno; i++) {
1358 struct tx_arg *ta_arg;
1359 struct dt_object *dt_obj;
1360 struct dt_device *sub_dt;
1361 struct sub_thandle *st;
1363 ta_arg = ta->ta_args[i];
1364 dt_obj = ta_arg->object;
1366 LASSERT(tmt->tmt_committed == 0);
1367 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1368 st = lookup_sub_thandle(tmt, sub_dt);
1370 LASSERT(st != NULL);
1371 LASSERT(st->st_sub_th != NULL);
1372 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1375 /* If the update is to update the reply data, then
1376 * we need set the session information, so
1377 * tgt_last_rcvd_update() can be called correctly */
1378 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1379 update_recovery_update_ses(env, tdtd, th,
1380 st->st_sub_th, dtrq, ta_arg);
1382 if (unlikely(rc < 0)) {
1383 CDEBUG(D_HA, "error during execution of #%u from"
1384 " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1385 ta->ta_args[i]->line, rc);
1387 if (ta->ta_args[i]->undo_fn != NULL) {
1388 dt_obj = ta->ta_args[i]->object;
1390 lu2dt_dev(dt_obj->do_lu.lo_dev);
1391 st = lookup_sub_thandle(tmt, sub_dt);
1392 LASSERT(st != NULL);
1393 LASSERT(st->st_sub_th != NULL);
1395 ta->ta_args[i]->undo_fn(env,
1399 CERROR("%s: undo for %s:%d: rc = %d\n",
1400 dt_obd_name(ta->ta_handle->th_dev),
1401 ta->ta_args[i]->file,
1402 ta->ta_args[i]->line, -ENOTSUPP);
1407 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1408 dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1414 rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1415 for (i = 0; i < ta->ta_argno; i++) {
1416 if (ta->ta_args[i]->object != NULL) {
1417 dt_object_put(env, ta->ta_args[i]->object);
1418 ta->ta_args[i]->object = NULL;
1423 tur->tur_update_records = NULL;
1425 if (tgt_ses_info(env)->tsi_exp != NULL) {
1426 class_export_put(tgt_ses_info(env)->tsi_exp);
1427 tgt_ses_info(env)->tsi_exp = NULL;
1430 lu_context_exit(&session_env);
1431 lu_context_fini(&session_env);
1434 EXPORT_SYMBOL(distribute_txn_replay_handle);