4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2015, 2017, Intel Corporation.
27 * lustre/target/update_recovery.c
29 * This file implement the methods to handle the update recovery.
31 * During DNE recovery, the recovery thread will redo the operation according
32 * to the transaction no, and these replay are either from client replay req
33 * or update replay records(for distribute transaction) in the update log.
34 * For distribute transaction replay, the replay thread will call
35 * distribute_txn_replay_handle() to handle the updates.
37 * After the Master MDT restarts, it will retrieve the update records from all
38 * of MDTs, for each distributed operation, it will check updates on all MDTs,
39 * if some updates records are missing on some MDTs, the replay thread will redo
40 * updates on these MDTs.
42 * Author: Di Wang <di.wang@intel.com>
44 #define DEBUG_SUBSYSTEM S_CLASS
46 #include <lu_target.h>
47 #include <lustre_obdo.h>
48 #include <lustre_update.h>
49 #include <lustre_swab.h>
50 #include <md_object.h>
52 #include <obd_class.h>
54 #include "tgt_internal.h"
57 * Lookup distribute_txn_replay req
59 * Lookup distribute_txn_replay in the replay list by batchid.
60 * It is assumed the list has been locked before calling this function.
62 * \param[in] tdtd distribute_txn_data, which holds the replay
64 * \param[in] batchid batchid used by lookup.
66 * \retval pointer of the replay if succeeds.
67 * \retval NULL if can not find it.
69 static struct distribute_txn_replay_req *
70 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
72 struct distribute_txn_replay_req *tmp;
73 struct distribute_txn_replay_req *dtrq = NULL;
75 list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
76 if (tmp->dtrq_batchid == batchid) {
85 * insert distribute txn replay req
87 * Insert distribute txn replay to the replay list, and it assumes the
88 * list has been looked. Note: the replay list is a sorted list, which
89 * is sorted by master transno. It is assumed the replay list has been
90 * locked before calling this function.
92 * \param[in] tdtd target distribute txn data where replay list is
93 * \param[in] new distribute txn replay to be inserted
95 * \retval 0 if insertion succeeds
96 * \retval EEXIST if the dtrq already exists
98 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
99 struct distribute_txn_replay_req *new)
101 struct distribute_txn_replay_req *iter;
103 /* Check if the dtrq has been added to the list */
104 iter = dtrq_lookup(tdtd, new->dtrq_batchid);
108 list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
109 if (iter->dtrq_master_transno > new->dtrq_master_transno)
112 /* If there are mulitple replay req with same transno, then
113 * sort them with batchid */
114 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
115 iter->dtrq_batchid > new->dtrq_batchid)
118 list_add(&new->dtrq_list, &iter->dtrq_list);
122 if (list_empty(&new->dtrq_list))
123 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
129 * create distribute txn replay req
131 * Allocate distribute txn replay req according to the update records.
133 * \param[in] tdtd target distribute txn data where replay list is.
134 * \param[in] record update records from the update log.
136 * \retval the pointer of distribute txn replay req if
137 * the creation succeeds.
138 * \retval NULL if the creation fails.
140 static struct distribute_txn_replay_req *
141 dtrq_create(struct target_distribute_txn_data *tdtd,
142 struct llog_update_record *lur)
144 struct distribute_txn_replay_req *new;
148 RETURN(ERR_PTR(-ENOMEM));
150 new->dtrq_lur_size = llog_update_record_size(lur);
151 OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
152 if (new->dtrq_lur == NULL) {
154 RETURN(ERR_PTR(-ENOMEM));
157 memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
159 /* If the transno in the update record is 0, it means the
160 * update are from master MDT, and it will use the master
161 * last committed transno as its master transno. Later, if
162 * the update records are gotten from slave MDTs, then these
163 * transno will be replaced.
164 * See insert_update_records_to_replay_list(). */
165 if (lur->lur_update_rec.ur_master_transno == 0) {
166 new->dtrq_lur->lur_update_rec.ur_master_transno =
167 tdtd->tdtd_lut->lut_obd->obd_last_committed;
168 new->dtrq_master_transno =
169 tdtd->tdtd_lut->lut_obd->obd_last_committed;
171 new->dtrq_master_transno =
172 lur->lur_update_rec.ur_master_transno;
175 new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
177 spin_lock_init(&new->dtrq_sub_list_lock);
178 INIT_LIST_HEAD(&new->dtrq_sub_list);
179 INIT_LIST_HEAD(&new->dtrq_list);
185 * Lookup distribute sub replay
187 * Lookup distribute sub replay in the sub list of distribute_txn_replay by
190 * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
191 * \param[in] mdt_index the mdt_index as the key of lookup
193 * \retval the pointer of sub replay if it can be found.
194 * \retval NULL if it can not find.
196 struct distribute_txn_replay_req_sub *
197 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
199 struct distribute_txn_replay_req_sub *dtrqs = NULL;
200 struct distribute_txn_replay_req_sub *tmp;
202 list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
203 if (tmp->dtrqs_mdt_index == mdt_index) {
212 * Try to add cookie to sub distribute txn request
214 * Check if the update log cookie has been added to the request, if not,
215 * add it to the dtrqs_cookie_list.
217 * \param[in] dtrqs sub replay req where cookies to be added.
218 * \param[in] cookie cookie to be added.
220 * \retval 0 if the cookie is adding succeeds.
221 * \retval negative errno if adding fails.
223 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
224 struct llog_cookie *cookie)
226 struct sub_thandle_cookie *new;
232 INIT_LIST_HEAD(&new->stc_list);
233 new->stc_cookie = *cookie;
234 /* Note: only single thread will access one sub_request each time,
235 * so no need lock here */
236 list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
242 * Insert distribute txn sub req replay
244 * Allocate sub replay req and insert distribute txn replay list.
246 * \param[in] dtrq d to be added
247 * \param[in] cookie the cookie of the update record
248 * \param[in] mdt_index the mdt_index of the update record
250 * \retval 0 if the adding succeeds.
251 * \retval negative errno if the adding fails.
254 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
255 struct llog_cookie *cookie,
258 struct distribute_txn_replay_req_sub *dtrqs = NULL;
259 struct distribute_txn_replay_req_sub *new;
263 spin_lock(&dtrq->dtrq_sub_list_lock);
264 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
265 spin_unlock(&dtrq->dtrq_sub_list_lock);
267 rc = dtrq_sub_add_cookie(dtrqs, cookie);
275 INIT_LIST_HEAD(&new->dtrqs_list);
276 INIT_LIST_HEAD(&new->dtrqs_cookie_list);
277 new->dtrqs_mdt_index = mdt_index;
278 spin_lock(&dtrq->dtrq_sub_list_lock);
279 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
281 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
286 spin_unlock(&dtrq->dtrq_sub_list_lock);
288 rc = dtrq_sub_add_cookie(dtrqs, cookie);
294 * append updates to the current replay updates
296 * Append more updates to the existent replay update. And this is only
297 * used when combining mulitple updates into one large updates during
300 * \param[in] dtrq the update replay request where the new update
301 * records will be added.
302 * \param[in] lur the new update record.
304 * \retval 0 if appending succeeds.
305 * \retval negative errno if appending fails.
307 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
308 struct update_records *record)
310 struct llog_update_record *new_lur;
311 size_t lur_size = dtrq->dtrq_lur_size;
315 /* Because several threads might retrieve the same records from
316 * different targets, and we only need one copy of records. So
317 * we will check if the records is in the next one, if not, just
319 spin_lock(&dtrq->dtrq_sub_list_lock);
320 if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
321 spin_unlock(&dtrq->dtrq_sub_list_lock);
324 dtrq->dtrq_lur->lur_update_rec.ur_index++;
325 spin_unlock(&dtrq->dtrq_sub_list_lock);
327 lur_size += update_records_size(record);
328 OBD_ALLOC_LARGE(new_lur, lur_size);
329 if (new_lur == NULL) {
330 spin_lock(&dtrq->dtrq_sub_list_lock);
331 dtrq->dtrq_lur->lur_update_rec.ur_index--;
332 spin_unlock(&dtrq->dtrq_sub_list_lock);
336 /* Copy the old and new records to the new allocated buffer */
337 memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
338 ptr = (char *)&new_lur->lur_update_rec +
339 update_records_size(&new_lur->lur_update_rec);
340 memcpy(ptr, &record->ur_ops,
341 update_records_size(record) -
342 offsetof(struct update_records, ur_ops));
344 new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
345 new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
346 new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
348 /* Replace the records */
349 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
350 dtrq->dtrq_lur = new_lur;
351 dtrq->dtrq_lur_size = lur_size;
352 dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
353 update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
358 * Insert update records to the replay list.
360 * Allocate distribute txn replay req and insert it into the replay
361 * list, then insert the update records into the replay req.
363 * \param[in] tdtd distribute txn replay data where the replay list
365 * \param[in] record the update record
366 * \param[in] cookie cookie of the record
367 * \param[in] index mdt index of the record
369 * \retval 0 if the adding succeeds.
370 * \retval negative errno if the adding fails.
373 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
374 struct llog_update_record *lur,
375 struct llog_cookie *cookie,
378 struct distribute_txn_replay_req *dtrq;
379 struct update_records *record = &lur->lur_update_rec;
380 bool replace_record = false;
384 CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
385 " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
386 record->ur_batchid, record->ur_master_transno, mdt_index);
388 /* Update batchid if necessary */
389 spin_lock(&tdtd->tdtd_batchid_lock);
390 if (record->ur_batchid >= tdtd->tdtd_batchid) {
391 CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
392 tdtd->tdtd_lut->lut_obd->obd_name,
393 tdtd->tdtd_batchid, record->ur_batchid);
394 tdtd->tdtd_batchid = record->ur_batchid + 1;
396 spin_unlock(&tdtd->tdtd_batchid_lock);
399 spin_lock(&tdtd->tdtd_replay_list_lock);
400 /* First try to build the replay update request with the records */
401 dtrq = dtrq_lookup(tdtd, record->ur_batchid);
403 spin_unlock(&tdtd->tdtd_replay_list_lock);
404 dtrq = dtrq_create(tdtd, lur);
406 RETURN(PTR_ERR(dtrq));
408 spin_lock(&tdtd->tdtd_replay_list_lock);
409 rc = dtrq_insert(tdtd, dtrq);
411 spin_unlock(&tdtd->tdtd_replay_list_lock);
418 /* If the master transno in update header is not
419 * matched with the one in the record, then it means
420 * the dtrq is originally created by master record,
421 * so we need update master transno and reposition
422 * the dtrq(by master transno) in the list and also
423 * replace update record */
424 if (record->ur_master_transno != 0 &&
425 dtrq->dtrq_master_transno != record->ur_master_transno &&
426 dtrq->dtrq_lur != NULL) {
427 list_del_init(&dtrq->dtrq_list);
428 dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
429 record->ur_master_transno;
431 dtrq->dtrq_master_transno = record->ur_master_transno;
432 replace_record = true;
433 /* try to insert again */
434 rc = dtrq_insert(tdtd, dtrq);
436 spin_unlock(&tdtd->tdtd_replay_list_lock);
442 spin_unlock(&tdtd->tdtd_replay_list_lock);
444 /* Because there should be only thread access the update record, so
445 * we do not need lock here */
446 if (replace_record) {
447 /* Replace the update record and master transno */
448 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
449 dtrq->dtrq_lur = NULL;
450 dtrq->dtrq_lur_size = llog_update_record_size(lur);
451 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
452 if (dtrq->dtrq_lur == NULL)
455 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
458 /* This is a partial update records, let's try to append
459 * the record to the current replay request */
460 if (record->ur_flags & UPDATE_RECORD_CONTINUE)
461 rc = dtrq_append_updates(dtrq, record);
463 /* Then create and add sub update request */
464 rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
468 EXPORT_SYMBOL(insert_update_records_to_replay_list);
471 * Dump updates of distribute txns.
473 * Output all of recovery updates in the distribute txn list to the
476 * \param[in] tdtd distribute txn data where all of distribute txn
478 * \param[in] mask debug mask
480 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
482 struct distribute_txn_replay_req *dtrq;
484 spin_lock(&tdtd->tdtd_replay_list_lock);
485 list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
486 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
488 spin_unlock(&tdtd->tdtd_replay_list_lock);
490 EXPORT_SYMBOL(dtrq_list_dump);
493 * Destroy distribute txn replay req
495 * Destroy distribute txn replay req and all of subs.
497 * \param[in] dtrq distribute txn replqy req to be destroyed.
499 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
501 struct distribute_txn_replay_req_sub *dtrqs;
502 struct distribute_txn_replay_req_sub *tmp;
504 LASSERT(list_empty(&dtrq->dtrq_list));
505 CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
506 dtrq->dtrq_master_transno);
507 spin_lock(&dtrq->dtrq_sub_list_lock);
508 list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
509 struct sub_thandle_cookie *stc;
510 struct sub_thandle_cookie *tmp;
512 list_del(&dtrqs->dtrqs_list);
513 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
515 list_del(&stc->stc_list);
520 spin_unlock(&dtrq->dtrq_sub_list_lock);
522 if (dtrq->dtrq_lur != NULL)
523 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
527 EXPORT_SYMBOL(dtrq_destroy);
530 * Destroy all of replay req.
532 * Destroy all of replay req in the replay list.
534 * \param[in] tdtd target distribute txn data where the replay list is.
536 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
538 struct distribute_txn_replay_req *dtrq;
539 struct distribute_txn_replay_req *tmp;
541 spin_lock(&tdtd->tdtd_replay_list_lock);
542 list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
544 list_del_init(&dtrq->dtrq_list);
547 list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
549 list_del_init(&dtrq->dtrq_list);
552 spin_unlock(&tdtd->tdtd_replay_list_lock);
554 EXPORT_SYMBOL(dtrq_list_destroy);
557 * Get next req in the replay list
559 * Get next req needs to be replayed, since it is a sorted list
560 * (by master MDT transno)
562 * \param[in] tdtd distribute txn data where the replay list is
564 * \retval the pointer of update recovery header
566 struct distribute_txn_replay_req *
567 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
569 struct distribute_txn_replay_req *dtrq = NULL;
571 spin_lock(&tdtd->tdtd_replay_list_lock);
572 if (!list_empty(&tdtd->tdtd_replay_list)) {
573 dtrq = list_entry(tdtd->tdtd_replay_list.next,
574 struct distribute_txn_replay_req, dtrq_list);
575 list_del_init(&dtrq->dtrq_list);
577 spin_unlock(&tdtd->tdtd_replay_list_lock);
581 EXPORT_SYMBOL(distribute_txn_get_next_req);
584 * Get next transno in the replay list, because this is the sorted
585 * list, so it will return the transno of next req in the list.
587 * \param[in] tdtd distribute txn data where the replay list is
589 * \retval the transno of next update in the list
591 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
593 struct distribute_txn_replay_req *dtrq = NULL;
596 spin_lock(&tdtd->tdtd_replay_list_lock);
597 if (!list_empty(&tdtd->tdtd_replay_list)) {
598 dtrq = list_entry(tdtd->tdtd_replay_list.next,
599 struct distribute_txn_replay_req, dtrq_list);
600 transno = dtrq->dtrq_master_transno;
602 spin_unlock(&tdtd->tdtd_replay_list_lock);
604 CDEBUG(D_HA, "%s: Next update transno %llu\n",
605 tdtd->tdtd_lut->lut_obd->obd_name, transno);
608 EXPORT_SYMBOL(distribute_txn_get_next_transno);
610 struct distribute_txn_replay_req *
611 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
614 struct distribute_txn_replay_req *dtrq = NULL;
615 struct distribute_txn_replay_req *iter;
617 spin_lock(&tdtd->tdtd_replay_list_lock);
618 list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
619 if (iter->dtrq_master_transno == transno) {
624 spin_unlock(&tdtd->tdtd_replay_list_lock);
628 bool is_req_replayed_by_update(struct ptlrpc_request *req)
630 struct lu_target *tgt = class_exp2tgt(req->rq_export);
631 struct distribute_txn_replay_req *dtrq;
633 if (tgt->lut_tdtd == NULL)
636 dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
637 lustre_msg_get_transno(req->rq_reqmsg));
643 EXPORT_SYMBOL(is_req_replayed_by_update);
646 * Check if the update of one object is committed
648 * Check whether the update for the object is committed by checking whether
649 * the correspondent sub exists in the replay req. If it is committed, mark
650 * the committed flag in correspondent the sub thandle.
652 * \param[in] env execution environment
653 * \param[in] dtrq replay request
654 * \param[in] dt_obj object for the update
655 * \param[in] top_th top thandle
656 * \param[in] sub_th sub thandle which the update belongs to
658 * \retval 1 if the update is not committed.
659 * \retval 0 if the update is committed.
660 * \retval negative errno if some other failures happen.
662 static int update_is_committed(const struct lu_env *env,
663 struct distribute_txn_replay_req *dtrq,
664 struct dt_object *dt_obj,
665 struct top_thandle *top_th,
666 struct sub_thandle *st)
668 struct seq_server_site *seq_site;
669 const struct lu_fid *fid = lu_object_fid(&dt_obj->do_lu);
670 struct distribute_txn_replay_req_sub *dtrqs;
674 if (st->st_sub_th != NULL)
677 if (st->st_committed)
680 seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
681 if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
682 mdt_index = fid_oid(fid);
683 } else if (!fid_seq_in_fldb(fid_seq(fid))) {
684 mdt_index = seq_site->ss_node_id;
686 struct lu_server_fld *fld;
687 struct lu_seq_range range = {0};
690 fld = seq_site->ss_server_fld;
691 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
692 LASSERT(fld->lsf_seq_lookup != NULL);
693 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
697 mdt_index = range.lsr_index;
700 dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
701 if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
702 st->st_committed = 1;
704 struct sub_thandle_cookie *stc;
705 struct sub_thandle_cookie *tmp;
707 list_for_each_entry_safe(stc, tmp,
708 &dtrqs->dtrqs_cookie_list,
710 list_move(&stc->stc_list, &st->st_cookie_list);
715 CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
722 * Implementation of different update methods for update recovery.
724 * These following functions update_recovery_$(update_name) implement
725 * different updates recovery methods. They will extract the parameters
726 * from the common parameters area and call correspondent dt API to redo
729 * \param[in] env execution environment
730 * \param[in] op update operation to be replayed
731 * \param[in] params common update parameters which holds all parameters
733 * \param[in] th transaction handle
734 * \param[in] declare indicate it will do declare or real execution, true
735 * means declare, false means real execution
737 * \retval 0 if it succeeds.
738 * \retval negative errno if it fails.
740 static int update_recovery_create(const struct lu_env *env,
741 struct dt_object *dt_obj,
742 const struct update_op *op,
743 const struct update_params *params,
744 struct thandle_exec_args *ta,
747 struct update_thread_info *uti = update_env_info(env);
748 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
749 struct lu_attr *attr = &uti->uti_attr;
751 struct obdo *lobdo = &uti->uti_obdo;
752 struct dt_object_format dof;
754 unsigned int param_count;
758 if (dt_object_exists(dt_obj))
761 param_count = lur->lur_update_rec.ur_param_count;
762 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
766 if (size != sizeof(*wobdo))
769 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
770 lustre_swab_obdo(wobdo);
772 lustre_get_wire_obdo(NULL, lobdo, wobdo);
773 la_from_obdo(attr, lobdo, lobdo->o_valid);
775 dof.dof_type = dt_mode_to_dft(attr->la_mode);
777 rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
783 static int update_recovery_destroy(const struct lu_env *env,
784 struct dt_object *dt_obj,
785 const struct update_op *op,
786 const struct update_params *params,
787 struct thandle_exec_args *ta,
793 rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
798 static int update_recovery_ref_add(const struct lu_env *env,
799 struct dt_object *dt_obj,
800 const struct update_op *op,
801 const struct update_params *params,
802 struct thandle_exec_args *ta,
808 rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
813 static int update_recovery_ref_del(const struct lu_env *env,
814 struct dt_object *dt_obj,
815 const struct update_op *op,
816 const struct update_params *params,
817 struct thandle_exec_args *ta,
823 rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
828 static int update_recovery_attr_set(const struct lu_env *env,
829 struct dt_object *dt_obj,
830 const struct update_op *op,
831 const struct update_params *params,
832 struct thandle_exec_args *ta,
835 struct update_thread_info *uti = update_env_info(env);
836 struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
838 struct obdo *lobdo = &uti->uti_obdo;
839 struct lu_attr *attr = &uti->uti_attr;
841 unsigned int param_count;
845 param_count = lur->lur_update_rec.ur_param_count;
846 wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
850 if (size != sizeof(*wobdo))
853 if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
854 lustre_swab_obdo(wobdo);
856 lustre_get_wire_obdo(NULL, lobdo, wobdo);
857 la_from_obdo(attr, lobdo, lobdo->o_valid);
859 rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
864 static int update_recovery_xattr_set(const struct lu_env *env,
865 struct dt_object *dt_obj,
866 const struct update_op *op,
867 const struct update_params *params,
868 struct thandle_exec_args *ta,
871 struct update_thread_info *uti = update_env_info(env);
880 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
881 name = update_params_get_param_buf(params,
882 op->uop_params_off[0],
887 buf = update_params_get_param_buf(params,
888 op->uop_params_off[1],
893 uti->uti_buf.lb_buf = buf;
894 uti->uti_buf.lb_len = (size_t)size;
896 buf = update_params_get_param_buf(params, op->uop_params_off[2],
900 if (size != sizeof(fl))
903 fl = le32_to_cpu(*(int *)buf);
905 rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
911 static int update_recovery_index_insert(const struct lu_env *env,
912 struct dt_object *dt_obj,
913 const struct update_op *op,
914 const struct update_params *params,
915 struct thandle_exec_args *ta,
918 struct update_thread_info *uti = update_env_info(env);
928 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
929 name = update_params_get_param_buf(params, op->uop_params_off[0],
934 fid = update_params_get_param_buf(params, op->uop_params_off[1],
938 if (size != sizeof(*fid))
941 fid_le_to_cpu(fid, fid);
943 ptype = update_params_get_param_buf(params, op->uop_params_off[2],
947 if (size != sizeof(*ptype))
949 type = le32_to_cpu(*ptype);
951 if (dt_try_as_dir(env, dt_obj) == 0)
954 uti->uti_rec.rec_fid = fid;
955 uti->uti_rec.rec_type = type;
957 rc = out_tx_index_insert(env, dt_obj,
958 (const struct dt_rec *)&uti->uti_rec,
959 (const struct dt_key *)name, ta, th,
965 static int update_recovery_index_delete(const struct lu_env *env,
966 struct dt_object *dt_obj,
967 const struct update_op *op,
968 const struct update_params *params,
969 struct thandle_exec_args *ta,
972 struct update_thread_info *uti = update_env_info(env);
979 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
980 name = update_params_get_param_buf(params, op->uop_params_off[0],
985 if (dt_try_as_dir(env, dt_obj) == 0)
988 rc = out_tx_index_delete(env, dt_obj,
989 (const struct dt_key *)name, ta, th, NULL, 0);
994 static int update_recovery_write(const struct lu_env *env,
995 struct dt_object *dt_obj,
996 const struct update_op *op,
997 const struct update_params *params,
998 struct thandle_exec_args *ta,
1001 struct update_thread_info *uti = update_env_info(env);
1009 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1010 buf = update_params_get_param_buf(params, op->uop_params_off[0],
1011 param_count, &size);
1015 uti->uti_buf.lb_buf = buf;
1016 uti->uti_buf.lb_len = size;
1018 buf = update_params_get_param_buf(params, op->uop_params_off[1],
1019 param_count, &size);
1023 pos = le64_to_cpu(*(__u64 *)buf);
1025 rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1031 static int update_recovery_xattr_del(const struct lu_env *env,
1032 struct dt_object *dt_obj,
1033 const struct update_op *op,
1034 const struct update_params *params,
1035 struct thandle_exec_args *ta,
1038 struct update_thread_info *uti = update_env_info(env);
1045 param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1046 name = update_params_get_param_buf(params, op->uop_params_off[0],
1047 param_count, &size);
1051 rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1057 * Update session information
1059 * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1060 * can be called correctly during update replay.
1062 * \param[in] env execution environment.
1063 * \param[in] tdtd distribute data structure of the recovering tgt.
1064 * \param[in] th thandle of this update replay.
1065 * \param[in] master_th master sub thandle.
1066 * \param[in] ta_arg the tx arg structure to hold the update for updating
1069 static void update_recovery_update_ses(struct lu_env *env,
1070 struct target_distribute_txn_data *tdtd,
1072 struct thandle *master_th,
1073 struct distribute_txn_replay_req *dtrq,
1074 struct tx_arg *ta_arg)
1076 struct tgt_session_info *tsi;
1077 struct lu_target *lut = tdtd->tdtd_lut;
1078 struct obd_export *export;
1079 struct cfs_hash *hash;
1080 struct top_thandle *top_th;
1081 struct lsd_reply_data *lrd;
1084 tsi = tgt_ses_info(env);
1085 if (tsi->tsi_exp != NULL)
1088 size = ta_arg->u.write.buf.lb_len;
1089 lrd = ta_arg->u.write.buf.lb_buf;
1090 if (size != sizeof(*lrd) || lrd == NULL)
1093 lrd->lrd_transno = le64_to_cpu(lrd->lrd_transno);
1094 lrd->lrd_xid = le64_to_cpu(lrd->lrd_xid);
1095 lrd->lrd_data = le64_to_cpu(lrd->lrd_data);
1096 lrd->lrd_result = le32_to_cpu(lrd->lrd_result);
1097 lrd->lrd_client_gen = le32_to_cpu(lrd->lrd_client_gen);
1099 CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
1100 if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1103 hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1107 export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1108 if (export == NULL) {
1109 cfs_hash_putref(hash);
1113 tsi->tsi_exp = export;
1114 tsi->tsi_xid = lrd->lrd_xid;
1115 tsi->tsi_opdata = lrd->lrd_data;
1116 tsi->tsi_result = lrd->lrd_result;
1117 tsi->tsi_client_gen = lrd->lrd_client_gen;
1118 dtrq->dtrq_xid = lrd->lrd_xid;
1119 top_th = container_of(th, struct top_thandle, tt_super);
1120 top_th->tt_master_sub_thandle = master_th;
1121 cfs_hash_putref(hash);
1125 * Execute updates in the update replay records
1127 * Declare distribute txn replay by update records and add the updates
1128 * to the execution list. Note: it will check if the update has been
1129 * committed, and only execute the updates if it is not committed to
1132 * \param[in] env execution environment
1133 * \param[in] tdtd distribute txn replay data which hold all of replay
1134 * reqs and all replay parameters.
1135 * \param[in] dtrq distribute transaction replay req.
1136 * \param[in] ta thandle execute args.
1138 * \retval 0 if declare succeeds.
1139 * \retval negative errno if declare fails.
1141 static int update_recovery_exec(const struct lu_env *env,
1142 struct target_distribute_txn_data *tdtd,
1143 struct distribute_txn_replay_req *dtrq,
1144 struct thandle_exec_args *ta)
1146 struct llog_update_record *lur = dtrq->dtrq_lur;
1147 struct update_records *records = &lur->lur_update_rec;
1148 struct update_ops *ops = &records->ur_ops;
1149 struct update_params *params = update_records_get_params(records);
1150 struct top_thandle *top_th = container_of(ta->ta_handle,
1153 struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1154 struct update_op *op;
1159 /* These records have been swabbed in llog_cat_process() */
1160 for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1161 i++, op = update_op_next_op(op)) {
1162 struct lu_fid *fid = &op->uop_fid;
1163 struct dt_object *dt_obj;
1164 struct dt_object *sub_dt_obj;
1165 struct dt_device *sub_dt;
1166 struct sub_thandle *st;
1168 if (op->uop_type == OUT_NOOP)
1171 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1172 if (IS_ERR(dt_obj)) {
1173 rc = PTR_ERR(dt_obj);
1175 LCONSOLE_WARN("%.16s: hit invalid OI mapping "
1176 "for "DFID" during recovering, "
1177 "that may because auto scrub is "
1178 "disabled on related MDT, and "
1179 "will cause recovery failure. "
1180 "Please enable auto scrub and "
1181 "retry the recovery.\n",
1182 tdtd->tdtd_lut->lut_obd->obd_name,
1187 sub_dt_obj = dt_object_child(dt_obj);
1189 /* Create sub thandle if not */
1190 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1191 st = lookup_sub_thandle(tmt, sub_dt);
1193 st = create_sub_thandle(tmt, sub_dt);
1195 GOTO(next, rc = PTR_ERR(st));
1198 /* check if updates on the OSD/OSP are committed */
1199 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1201 /* If this is committed, goto next */
1207 /* Create thandle for sub thandle if needed */
1208 if (st->st_sub_th == NULL) {
1209 rc = sub_thandle_trans_create(env, top_th, st);
1214 CDEBUG(D_HA, "replay %uth update\n", i);
1215 switch (op->uop_type) {
1217 rc = update_recovery_create(env, sub_dt_obj,
1222 rc = update_recovery_destroy(env, sub_dt_obj,
1227 rc = update_recovery_ref_add(env, sub_dt_obj,
1232 rc = update_recovery_ref_del(env, sub_dt_obj,
1237 rc = update_recovery_attr_set(env, sub_dt_obj,
1242 rc = update_recovery_xattr_set(env, sub_dt_obj,
1246 case OUT_INDEX_INSERT:
1247 rc = update_recovery_index_insert(env, sub_dt_obj,
1251 case OUT_INDEX_DELETE:
1252 rc = update_recovery_index_delete(env, sub_dt_obj,
1257 rc = update_recovery_write(env, sub_dt_obj,
1262 rc = update_recovery_xattr_del(env, sub_dt_obj,
1267 CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1272 dt_object_put(env, dt_obj);
1277 ta->ta_handle->th_result = rc;
1282 * redo updates on MDT if needed.
1284 * During DNE recovery, the recovery thread (target_recovery_thread) will call
1285 * this function to replay distribute txn updates on all MDTs. It only replay
1286 * updates on the MDT where the update record is missing.
1288 * If the update already exists on the MDT, then it does not need replay the
1289 * updates on that MDT, and only mark the sub transaction has been committed
1292 * \param[in] env execution environment
1293 * \param[in] tdtd target distribute txn data, which holds the replay list
1294 * and all parameters needed by replay process.
1295 * \param[in] dtrq distribute txn replay req.
1297 * \retval 0 if replay succeeds.
1298 * \retval negative errno if replay failes.
1300 int distribute_txn_replay_handle(struct lu_env *env,
1301 struct target_distribute_txn_data *tdtd,
1302 struct distribute_txn_replay_req *dtrq)
1304 struct update_records *records = &dtrq->dtrq_lur->lur_update_rec;
1305 struct thandle_exec_args *ta;
1306 struct lu_context session_env;
1307 struct thandle *th = NULL;
1308 struct top_thandle *top_th;
1309 struct top_multiple_thandle *tmt;
1310 struct thandle_update_records *tur = NULL;
1315 /* initialize session, it is needed for the handler of target */
1316 rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1318 CERROR("%s: failure to initialize session: rc = %d\n",
1319 tdtd->tdtd_lut->lut_obd->obd_name, rc);
1322 lu_context_enter(&session_env);
1323 env->le_ses = &session_env;
1325 update_records_dump(records, D_HA, true);
1326 th = top_trans_create(env, NULL);
1328 GOTO(exit_session, rc = PTR_ERR(th));
1330 ta = &update_env_info(env)->uti_tea;
1333 update_env_info(env)->uti_dtrq = dtrq;
1334 /* Create distribute transaction structure for this top thandle */
1335 top_th = container_of(th, struct top_thandle, tt_super);
1336 rc = top_trans_create_tmt(env, top_th);
1338 GOTO(stop_trans, rc);
1340 th->th_dev = tdtd->tdtd_dt;
1343 /* check if the distribute transaction has been committed */
1344 tmt = top_th->tt_multiple_thandle;
1345 tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1346 tmt->tmt_batchid = dtrq->dtrq_batchid;
1347 tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1349 if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1350 tmt->tmt_committed = 1;
1352 rc = update_recovery_exec(env, tdtd, dtrq, ta);
1354 GOTO(stop_trans, rc);
1356 /* If no updates are needed to be replayed, then mark this records as
1357 * committed, so commit thread distribute_txn_commit_thread() will
1358 * delete the record */
1359 if (ta->ta_argno == 0)
1360 tmt->tmt_committed = 1;
1362 tur = &update_env_info(env)->uti_tur;
1363 tur->tur_update_records = dtrq->dtrq_lur;
1364 tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1365 tur->tur_update_params = NULL;
1366 tur->tur_update_param_count = 0;
1367 tmt->tmt_update_records = tur;
1369 distribute_txn_insert_by_batchid(tmt);
1370 rc = top_trans_start(env, NULL, th);
1372 GOTO(stop_trans, rc);
1374 for (i = 0; i < ta->ta_argno; i++) {
1375 struct tx_arg *ta_arg;
1376 struct dt_object *dt_obj;
1377 struct dt_device *sub_dt;
1378 struct sub_thandle *st;
1380 ta_arg = ta->ta_args[i];
1381 dt_obj = ta_arg->object;
1383 LASSERT(tmt->tmt_committed == 0);
1384 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1385 st = lookup_sub_thandle(tmt, sub_dt);
1387 LASSERT(st != NULL);
1388 LASSERT(st->st_sub_th != NULL);
1389 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1392 /* If the update is to update the reply data, then
1393 * we need set the session information, so
1394 * tgt_last_rcvd_update() can be called correctly */
1395 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1396 update_recovery_update_ses(env, tdtd, th,
1397 st->st_sub_th, dtrq, ta_arg);
1399 if (unlikely(rc < 0)) {
1400 CDEBUG(D_HA, "error during execution of #%u from"
1401 " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1402 ta->ta_args[i]->line, rc);
1404 if (ta->ta_args[i]->undo_fn != NULL) {
1405 dt_obj = ta->ta_args[i]->object;
1407 lu2dt_dev(dt_obj->do_lu.lo_dev);
1408 st = lookup_sub_thandle(tmt, sub_dt);
1409 LASSERT(st != NULL);
1410 LASSERT(st->st_sub_th != NULL);
1412 ta->ta_args[i]->undo_fn(env,
1416 CERROR("%s: undo for %s:%d: rc = %d\n",
1417 dt_obd_name(ta->ta_handle->th_dev),
1418 ta->ta_args[i]->file,
1419 ta->ta_args[i]->line, -ENOTSUPP);
1424 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1425 dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1431 rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1432 for (i = 0; i < ta->ta_argno; i++) {
1433 if (ta->ta_args[i]->object != NULL) {
1434 dt_object_put(env, ta->ta_args[i]->object);
1435 ta->ta_args[i]->object = NULL;
1440 tur->tur_update_records = NULL;
1442 if (tgt_ses_info(env)->tsi_exp != NULL) {
1443 class_export_put(tgt_ses_info(env)->tsi_exp);
1444 tgt_ses_info(env)->tsi_exp = NULL;
1447 lu_context_exit(&session_env);
1448 lu_context_fini(&session_env);
1451 EXPORT_SYMBOL(distribute_txn_replay_handle);