Whamcloud - gitweb
LU-6142 lustre: use list_first/last_entry() for list heads
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2015, 2017, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <lustre_obdo.h>
48 #include <lustre_update.h>
49 #include <lustre_swab.h>
50 #include <md_object.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "tgt_internal.h"
55
56 /**
57  * Lookup distribute_txn_replay req
58  *
59  * Lookup distribute_txn_replay in the replay list by batchid.
60  * It is assumed the list has been locked before calling this function.
61  *
62  * \param[in] tdtd      distribute_txn_data, which holds the replay
63  *                      list.
64  * \param[in] batchid   batchid used by lookup.
65  *
66  * \retval              pointer of the replay if succeeds.
67  * \retval              NULL if can not find it.
68  */
69 static struct distribute_txn_replay_req *
70 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
71 {
72         struct distribute_txn_replay_req        *tmp;
73         struct distribute_txn_replay_req        *dtrq = NULL;
74
75         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
76                 if (tmp->dtrq_batchid == batchid) {
77                         dtrq = tmp;
78                         break;
79                 }
80         }
81         return dtrq;
82 }
83
84 /**
85  * insert distribute txn replay req
86  *
87  * Insert distribute txn replay to the replay list, and it assumes the
88  * list has been looked. Note: the replay list is a sorted list, which
89  * is sorted by master transno. It is assumed the replay list has been
90  * locked before calling this function.
91  *
92  * \param[in] tdtd      target distribute txn data where replay list is
93  * \param[in] new       distribute txn replay to be inserted
94  *
95  * \retval              0 if insertion succeeds
96  * \retval              EEXIST if the dtrq already exists
97  */
98 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
99                         struct distribute_txn_replay_req *new)
100 {
101         struct distribute_txn_replay_req *iter;
102
103         /* Check if the dtrq has been added to the list */
104         iter = dtrq_lookup(tdtd, new->dtrq_batchid);
105         if (iter != NULL)
106                 return -EEXIST;
107
108         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
109                 if (iter->dtrq_master_transno > new->dtrq_master_transno)
110                         continue;
111
112                 /* If there are mulitple replay req with same transno, then
113                  * sort them with batchid */
114                 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
115                     iter->dtrq_batchid > new->dtrq_batchid)
116                         continue;
117
118                 list_add(&new->dtrq_list, &iter->dtrq_list);
119                 break;
120         }
121
122         if (list_empty(&new->dtrq_list))
123                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
124
125         return 0;
126 }
127
128 /**
129  * create distribute txn replay req
130  *
131  * Allocate distribute txn replay req according to the update records.
132  *
133  * \param[in] tdtd      target distribute txn data where replay list is.
134  * \param[in] record    update records from the update log.
135  *
136  * \retval              the pointer of distribute txn replay req if
137  *                      the creation succeeds.
138  * \retval              NULL if the creation fails.
139  */
140 static struct distribute_txn_replay_req *
141 dtrq_create(struct target_distribute_txn_data *tdtd,
142             struct llog_update_record *lur)
143 {
144         struct distribute_txn_replay_req *new;
145
146         OBD_ALLOC_PTR(new);
147         if (new == NULL)
148                 RETURN(ERR_PTR(-ENOMEM));
149
150         new->dtrq_lur_size = llog_update_record_size(lur);
151         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
152         if (new->dtrq_lur == NULL) {
153                 OBD_FREE_PTR(new);
154                 RETURN(ERR_PTR(-ENOMEM));
155         }
156
157         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
158
159         /* If the transno in the update record is 0, it means the
160          * update are from master MDT, and it will use the master
161          * last committed transno as its master transno. Later, if
162          * the update records are gotten from slave MDTs, then these
163          * transno will be replaced.
164          * See insert_update_records_to_replay_list(). */
165         if (lur->lur_update_rec.ur_master_transno == 0) {
166                 new->dtrq_lur->lur_update_rec.ur_master_transno =
167                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
168                 new->dtrq_master_transno =
169                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
170         } else {
171                 new->dtrq_master_transno =
172                                 lur->lur_update_rec.ur_master_transno;
173         }
174
175         new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
176
177         spin_lock_init(&new->dtrq_sub_list_lock);
178         INIT_LIST_HEAD(&new->dtrq_sub_list);
179         INIT_LIST_HEAD(&new->dtrq_list);
180
181         RETURN(new);
182 }
183
184 /**
185  * Lookup distribute sub replay
186  *
187  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
188  * mdt_index.
189  *
190  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
191  * \param[in] mdt_index                 the mdt_index as the key of lookup
192  *
193  * \retval              the pointer of sub replay if it can be found.
194  * \retval              NULL if it can not find.
195  */
196 struct distribute_txn_replay_req_sub *
197 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
198 {
199         struct distribute_txn_replay_req_sub *dtrqs = NULL;
200         struct distribute_txn_replay_req_sub *tmp;
201
202         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
203                 if (tmp->dtrqs_mdt_index == mdt_index) {
204                         dtrqs = tmp;
205                         break;
206                 }
207         }
208         return dtrqs;
209 }
210
211 /**
212  * Try to add cookie to sub distribute txn request
213  *
214  * Check if the update log cookie has been added to the request, if not,
215  * add it to the dtrqs_cookie_list.
216  *
217  * \param[in] dtrqs     sub replay req where cookies to be added.
218  * \param[in] cookie    cookie to be added.
219  *
220  * \retval              0 if the cookie is adding succeeds.
221  * \retval              negative errno if adding fails.
222  */
223 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
224                                struct llog_cookie *cookie)
225 {
226         struct sub_thandle_cookie *new;
227
228         OBD_ALLOC_PTR(new);
229         if (new == NULL)
230                 return -ENOMEM;
231
232         INIT_LIST_HEAD(&new->stc_list);
233         new->stc_cookie = *cookie;
234         /* Note: only single thread will access one sub_request each time,
235          * so no need lock here */
236         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
237
238         return 0;
239 }
240
241 /**
242  * Insert distribute txn sub req replay
243  *
244  * Allocate sub replay req and insert distribute txn replay list.
245  *
246  * \param[in] dtrq      d to be added
247  * \param[in] cookie    the cookie of the update record
248  * \param[in] mdt_index the mdt_index of the update record
249  *
250  * \retval              0 if the adding succeeds.
251  * \retval              negative errno if the adding fails.
252  */
253 static int
254 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
255                            struct llog_cookie *cookie,
256                            __u32 mdt_index)
257 {
258         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
259         struct distribute_txn_replay_req_sub    *new;
260         int                                     rc;
261         ENTRY;
262
263         spin_lock(&dtrq->dtrq_sub_list_lock);
264         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
265         spin_unlock(&dtrq->dtrq_sub_list_lock);
266         if (dtrqs != NULL) {
267                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
268                 RETURN(0);
269         }
270
271         OBD_ALLOC_PTR(new);
272         if (new == NULL)
273                 RETURN(-ENOMEM);
274
275         INIT_LIST_HEAD(&new->dtrqs_list);
276         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
277         new->dtrqs_mdt_index = mdt_index;
278         spin_lock(&dtrq->dtrq_sub_list_lock);
279         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
280         if (dtrqs == NULL) {
281                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
282                 dtrqs = new;
283         } else {
284                 OBD_FREE_PTR(new);
285         }
286         spin_unlock(&dtrq->dtrq_sub_list_lock);
287
288         rc = dtrq_sub_add_cookie(dtrqs, cookie);
289
290         RETURN(rc);
291 }
292
293 /**
294  * append updates to the current replay updates
295  *
296  * Append more updates to the existent replay update. And this is only
297  * used when combining mulitple updates into one large updates during
298  * replay.
299  *
300  * \param[in] dtrq      the update replay request where the new update
301  *                      records will be added.
302  * \param[in] lur       the new update record.
303  *
304  * \retval              0 if appending succeeds.
305  * \retval              negative errno if appending fails.
306  */
307 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
308                                struct update_records *record)
309 {
310         struct llog_update_record *new_lur;
311         size_t lur_size = dtrq->dtrq_lur_size;
312         void *ptr;
313         ENTRY;
314
315         /* Because several threads might retrieve the same records from
316          * different targets, and we only need one copy of records. So
317          * we will check if the records is in the next one, if not, just
318          * skip it */
319         spin_lock(&dtrq->dtrq_sub_list_lock);
320         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
321                 spin_unlock(&dtrq->dtrq_sub_list_lock);
322                 RETURN(0);
323         }
324         dtrq->dtrq_lur->lur_update_rec.ur_index++;
325         spin_unlock(&dtrq->dtrq_sub_list_lock);
326
327         lur_size += update_records_size(record);
328         OBD_ALLOC_LARGE(new_lur, lur_size);
329         if (new_lur == NULL) {
330                 spin_lock(&dtrq->dtrq_sub_list_lock);
331                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
332                 spin_unlock(&dtrq->dtrq_sub_list_lock);
333                 RETURN(-ENOMEM);
334         }
335
336         /* Copy the old and new records to the new allocated buffer */
337         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
338         ptr = (char *)&new_lur->lur_update_rec +
339                 update_records_size(&new_lur->lur_update_rec);
340         memcpy(ptr, &record->ur_ops,
341                update_records_size(record) -
342                offsetof(struct update_records, ur_ops));
343
344         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
345         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
346         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
347
348         /* Replace the records */
349         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
350         dtrq->dtrq_lur = new_lur;
351         dtrq->dtrq_lur_size = lur_size;
352         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
353         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
354         RETURN(0);
355 }
356
357 /**
358  * Insert update records to the replay list.
359  *
360  * Allocate distribute txn replay req and insert it into the replay
361  * list, then insert the update records into the replay req.
362  *
363  * \param[in] tdtd      distribute txn replay data where the replay list
364  *                      is.
365  * \param[in] record    the update record
366  * \param[in] cookie    cookie of the record
367  * \param[in] index     mdt index of the record
368  *
369  * \retval              0 if the adding succeeds.
370  * \retval              negative errno if the adding fails.
371  */
372 int
373 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
374                                      struct llog_update_record *lur,
375                                      struct llog_cookie *cookie,
376                                      __u32 mdt_index)
377 {
378         struct distribute_txn_replay_req *dtrq;
379         struct update_records *record = &lur->lur_update_rec;
380         bool replace_record = false;
381         int rc = 0;
382         ENTRY;
383
384         CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
385                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
386                record->ur_batchid, record->ur_master_transno, mdt_index);
387
388         /* Update batchid if necessary */
389         spin_lock(&tdtd->tdtd_batchid_lock);
390         if (record->ur_batchid >= tdtd->tdtd_batchid) {
391                 CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
392                        tdtd->tdtd_lut->lut_obd->obd_name,
393                        tdtd->tdtd_batchid, record->ur_batchid);
394                 tdtd->tdtd_batchid = record->ur_batchid + 1;
395         }
396         spin_unlock(&tdtd->tdtd_batchid_lock);
397
398 again:
399         spin_lock(&tdtd->tdtd_replay_list_lock);
400         /* First try to build the replay update request with the records */
401         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
402         if (dtrq == NULL) {
403                 spin_unlock(&tdtd->tdtd_replay_list_lock);
404                 dtrq = dtrq_create(tdtd, lur);
405                 if (IS_ERR(dtrq))
406                         RETURN(PTR_ERR(dtrq));
407
408                 spin_lock(&tdtd->tdtd_replay_list_lock);
409                 rc = dtrq_insert(tdtd, dtrq);
410                 if (rc < 0) {
411                         spin_unlock(&tdtd->tdtd_replay_list_lock);
412                         dtrq_destroy(dtrq);
413                         if (rc == -EEXIST)
414                                 goto again;
415                         return rc;
416                 }
417         } else {
418                 /* If the master transno in update header is not
419                 * matched with the one in the record, then it means
420                 * the dtrq is originally created by master record,
421                 * so we need update master transno and reposition
422                 * the dtrq(by master transno) in the list and also
423                 * replace update record */
424                 if (record->ur_master_transno != 0 &&
425                     dtrq->dtrq_master_transno != record->ur_master_transno &&
426                     dtrq->dtrq_lur != NULL) {
427                         list_del_init(&dtrq->dtrq_list);
428                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
429                                                 record->ur_master_transno;
430
431                         dtrq->dtrq_master_transno = record->ur_master_transno;
432                         replace_record = true;
433                         /* try to insert again */
434                         rc = dtrq_insert(tdtd, dtrq);
435                         if (rc < 0) {
436                                 spin_unlock(&tdtd->tdtd_replay_list_lock);
437                                 dtrq_destroy(dtrq);
438                                 return rc;
439                         }
440                 }
441         }
442         spin_unlock(&tdtd->tdtd_replay_list_lock);
443
444         /* Because there should be only thread access the update record, so
445          * we do not need lock here */
446         if (replace_record) {
447                 /* Replace the update record and master transno */
448                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
449                 dtrq->dtrq_lur = NULL;
450                 dtrq->dtrq_lur_size = llog_update_record_size(lur);
451                 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
452                 if (dtrq->dtrq_lur == NULL)
453                         return -ENOMEM;
454
455                 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
456         }
457
458         /* This is a partial update records, let's try to append
459          * the record to the current replay request */
460         if (record->ur_flags & UPDATE_RECORD_CONTINUE)
461                 rc = dtrq_append_updates(dtrq, record);
462
463         /* Then create and add sub update request */
464         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
465
466         RETURN(rc);
467 }
468 EXPORT_SYMBOL(insert_update_records_to_replay_list);
469
470 /**
471  * Dump updates of distribute txns.
472  *
473  * Output all of recovery updates in the distribute txn list to the
474  * debug log.
475  *
476  * \param[in] tdtd      distribute txn data where all of distribute txn
477  *                      are listed.
478  * \param[in] mask      debug mask
479  */
480 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
481 {
482         struct distribute_txn_replay_req *dtrq;
483
484         spin_lock(&tdtd->tdtd_replay_list_lock);
485         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
486                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
487                                     false);
488         spin_unlock(&tdtd->tdtd_replay_list_lock);
489 }
490 EXPORT_SYMBOL(dtrq_list_dump);
491
492 /**
493  * Destroy distribute txn replay req
494  *
495  * Destroy distribute txn replay req and all of subs.
496  *
497  * \param[in] dtrq      distribute txn replqy req to be destroyed.
498  */
499 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
500 {
501         struct distribute_txn_replay_req_sub    *dtrqs;
502         struct distribute_txn_replay_req_sub    *tmp;
503
504         LASSERT(list_empty(&dtrq->dtrq_list));
505         CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
506                dtrq->dtrq_master_transno);
507         spin_lock(&dtrq->dtrq_sub_list_lock);
508         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
509                 struct sub_thandle_cookie *stc;
510                 struct sub_thandle_cookie *tmp;
511
512                 list_del(&dtrqs->dtrqs_list);
513                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
514                                          stc_list) {
515                         list_del(&stc->stc_list);
516                         OBD_FREE_PTR(stc);
517                 }
518                 OBD_FREE_PTR(dtrqs);
519         }
520         spin_unlock(&dtrq->dtrq_sub_list_lock);
521
522         if (dtrq->dtrq_lur != NULL)
523                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
524
525         OBD_FREE_PTR(dtrq);
526 }
527 EXPORT_SYMBOL(dtrq_destroy);
528
529 /**
530  * Destroy all of replay req.
531  *
532  * Destroy all of replay req in the replay list.
533  *
534  * \param[in] tdtd      target distribute txn data where the replay list is.
535  */
536 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
537 {
538         struct distribute_txn_replay_req *dtrq;
539         struct distribute_txn_replay_req *tmp;
540
541         spin_lock(&tdtd->tdtd_replay_list_lock);
542         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
543                                  dtrq_list) {
544                 list_del_init(&dtrq->dtrq_list);
545                 dtrq_destroy(dtrq);
546         }
547         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
548                                  dtrq_list) {
549                 list_del_init(&dtrq->dtrq_list);
550                 dtrq_destroy(dtrq);
551         }
552         spin_unlock(&tdtd->tdtd_replay_list_lock);
553 }
554 EXPORT_SYMBOL(dtrq_list_destroy);
555
556 /**
557  * Get next req in the replay list
558  *
559  * Get next req needs to be replayed, since it is a sorted list
560  * (by master MDT transno)
561  *
562  * \param[in] tdtd      distribute txn data where the replay list is
563  *
564  * \retval              the pointer of update recovery header
565  */
566 struct distribute_txn_replay_req *
567 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
568 {
569         struct distribute_txn_replay_req *dtrq = NULL;
570
571         spin_lock(&tdtd->tdtd_replay_list_lock);
572         if (!list_empty(&tdtd->tdtd_replay_list)) {
573                 dtrq = list_first_entry(&tdtd->tdtd_replay_list,
574                                         struct distribute_txn_replay_req,
575                                         dtrq_list);
576                 list_del_init(&dtrq->dtrq_list);
577         }
578         spin_unlock(&tdtd->tdtd_replay_list_lock);
579
580         return dtrq;
581 }
582 EXPORT_SYMBOL(distribute_txn_get_next_req);
583
584 /**
585  * Get next transno in the replay list, because this is the sorted
586  * list, so it will return the transno of next req in the list.
587  *
588  * \param[in] tdtd      distribute txn data where the replay list is
589  *
590  * \retval              the transno of next update in the list
591  */
592 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
593 {
594         struct distribute_txn_replay_req        *dtrq = NULL;
595         __u64                                   transno = 0;
596
597         spin_lock(&tdtd->tdtd_replay_list_lock);
598         if (!list_empty(&tdtd->tdtd_replay_list)) {
599                 dtrq = list_first_entry(&tdtd->tdtd_replay_list,
600                                         struct distribute_txn_replay_req,
601                                         dtrq_list);
602                 transno = dtrq->dtrq_master_transno;
603         }
604         spin_unlock(&tdtd->tdtd_replay_list_lock);
605
606         CDEBUG(D_HA, "%s: Next update transno %llu\n",
607                tdtd->tdtd_lut->lut_obd->obd_name, transno);
608         return transno;
609 }
610 EXPORT_SYMBOL(distribute_txn_get_next_transno);
611
612 struct distribute_txn_replay_req *
613 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
614                                   __u64 transno)
615 {
616         struct distribute_txn_replay_req *dtrq = NULL;
617         struct distribute_txn_replay_req *iter;
618
619         spin_lock(&tdtd->tdtd_replay_list_lock);
620         list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
621                 if (iter->dtrq_master_transno == transno) {
622                         dtrq = iter;
623                         break;
624                 }
625         }
626         spin_unlock(&tdtd->tdtd_replay_list_lock);
627         return dtrq;
628 }
629
630 bool is_req_replayed_by_update(struct ptlrpc_request *req)
631 {
632         struct lu_target *tgt = class_exp2tgt(req->rq_export);
633         struct distribute_txn_replay_req *dtrq;
634
635         if (tgt->lut_tdtd == NULL)
636                 return false;
637
638         dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
639                                         lustre_msg_get_transno(req->rq_reqmsg));
640         if (dtrq == NULL)
641                 return false;
642
643         return true;
644 }
645 EXPORT_SYMBOL(is_req_replayed_by_update);
646
647 /**
648  * Check if the update of one object is committed
649  *
650  * Check whether the update for the object is committed by checking whether
651  * the correspondent sub exists in the replay req. If it is committed, mark
652  * the committed flag in correspondent the sub thandle.
653  *
654  * \param[in] env       execution environment
655  * \param[in] dtrq      replay request
656  * \param[in] dt_obj    object for the update
657  * \param[in] top_th    top thandle
658  * \param[in] sub_th    sub thandle which the update belongs to
659  *
660  * \retval              1 if the update is not committed.
661  * \retval              0 if the update is committed.
662  * \retval              negative errno if some other failures happen.
663  */
664 static int update_is_committed(const struct lu_env *env,
665                                struct distribute_txn_replay_req *dtrq,
666                                struct dt_object *dt_obj,
667                                struct top_thandle *top_th,
668                                struct sub_thandle *st)
669 {
670         struct seq_server_site  *seq_site;
671         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
672         struct distribute_txn_replay_req_sub    *dtrqs;
673         __u32                   mdt_index;
674         ENTRY;
675
676         if (st->st_sub_th != NULL)
677                 RETURN(1);
678
679         if (st->st_committed)
680                 RETURN(0);
681
682         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
683         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
684                 mdt_index = fid_oid(fid);
685         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
686                 mdt_index = seq_site->ss_node_id;
687         } else {
688                 struct lu_server_fld *fld;
689                 struct lu_seq_range range = {0};
690                 int rc;
691
692                 fld = seq_site->ss_server_fld;
693                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
694                 LASSERT(fld->lsf_seq_lookup != NULL);
695                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
696                                          &range);
697                 if (rc < 0)
698                         RETURN(rc);
699                 mdt_index = range.lsr_index;
700         }
701
702         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
703         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
704                 st->st_committed = 1;
705                 if (dtrqs != NULL) {
706                         struct sub_thandle_cookie *stc;
707                         struct sub_thandle_cookie *tmp;
708
709                         list_for_each_entry_safe(stc, tmp,
710                                                  &dtrqs->dtrqs_cookie_list,
711                                                  stc_list)
712                                 list_move(&stc->stc_list, &st->st_cookie_list);
713                 }
714                 RETURN(0);
715         }
716
717         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
718                mdt_index);
719
720         RETURN(1);
721 }
722
723 /**
724  * Implementation of different update methods for update recovery.
725  *
726  * These following functions update_recovery_$(update_name) implement
727  * different updates recovery methods. They will extract the parameters
728  * from the common parameters area and call correspondent dt API to redo
729  * the update.
730  *
731  * \param[in] env       execution environment
732  * \param[in] op        update operation to be replayed
733  * \param[in] params    common update parameters which holds all parameters
734  *                      of the operation
735  * \param[in] th        transaction handle
736  * \param[in] declare   indicate it will do declare or real execution, true
737  *                      means declare, false means real execution
738  *
739  * \retval              0 if it succeeds.
740  * \retval              negative errno if it fails.
741  */
742 static int update_recovery_create(const struct lu_env *env,
743                                   struct dt_object *dt_obj,
744                                   const struct update_op *op,
745                                   const struct update_params *params,
746                                   struct thandle_exec_args *ta,
747                                   struct thandle *th)
748 {
749         struct update_thread_info *uti = update_env_info(env);
750         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
751         struct lu_attr          *attr = &uti->uti_attr;
752         struct obdo             *wobdo;
753         struct obdo             *lobdo = &uti->uti_obdo;
754         struct dt_object_format dof;
755         __u16                   size;
756         unsigned int            param_count;
757         int rc;
758         ENTRY;
759
760         if (dt_object_exists(dt_obj))
761                 RETURN(-EEXIST);
762
763         param_count = lur->lur_update_rec.ur_param_count;
764         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
765                                             param_count, &size);
766         if (wobdo == NULL)
767                 RETURN(-EIO);
768         if (size != sizeof(*wobdo))
769                 RETURN(-EIO);
770
771         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
772                 lustre_swab_obdo(wobdo);
773
774         lustre_get_wire_obdo(NULL, lobdo, wobdo);
775         la_from_obdo(attr, lobdo, lobdo->o_valid);
776
777         dof.dof_type = dt_mode_to_dft(attr->la_mode);
778
779         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
780                            ta, th, NULL, 0);
781
782         RETURN(rc);
783 }
784
785 static int update_recovery_destroy(const struct lu_env *env,
786                                    struct dt_object *dt_obj,
787                                    const struct update_op *op,
788                                    const struct update_params *params,
789                                    struct thandle_exec_args *ta,
790                                    struct thandle *th)
791 {
792         int rc;
793         ENTRY;
794
795         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
796
797         RETURN(rc);
798 }
799
800 static int update_recovery_ref_add(const struct lu_env *env,
801                                    struct dt_object *dt_obj,
802                                    const struct update_op *op,
803                                    const struct update_params *params,
804                                    struct thandle_exec_args *ta,
805                                    struct thandle *th)
806 {
807         int rc;
808         ENTRY;
809
810         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
811
812         RETURN(rc);
813 }
814
815 static int update_recovery_ref_del(const struct lu_env *env,
816                                    struct dt_object *dt_obj,
817                                    const struct update_op *op,
818                                    const struct update_params *params,
819                                    struct thandle_exec_args *ta,
820                                    struct thandle *th)
821 {
822         int rc;
823         ENTRY;
824
825         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
826
827         RETURN(rc);
828 }
829
830 static int update_recovery_attr_set(const struct lu_env *env,
831                                     struct dt_object *dt_obj,
832                                     const struct update_op *op,
833                                     const struct update_params *params,
834                                     struct thandle_exec_args *ta,
835                                     struct thandle *th)
836 {
837         struct update_thread_info *uti = update_env_info(env);
838         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
839         struct obdo     *wobdo;
840         struct obdo     *lobdo = &uti->uti_obdo;
841         struct lu_attr  *attr = &uti->uti_attr;
842         __u16           size;
843         unsigned int    param_count;
844         int             rc;
845         ENTRY;
846
847         param_count = lur->lur_update_rec.ur_param_count;
848         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
849                                             param_count, &size);
850         if (wobdo == NULL)
851                 RETURN(-EIO);
852         if (size != sizeof(*wobdo))
853                 RETURN(-EIO);
854
855         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
856                 lustre_swab_obdo(wobdo);
857
858         lustre_get_wire_obdo(NULL, lobdo, wobdo);
859         la_from_obdo(attr, lobdo, lobdo->o_valid);
860
861         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
862
863         RETURN(rc);
864 }
865
866 static int update_recovery_xattr_set(const struct lu_env *env,
867                                      struct dt_object *dt_obj,
868                                      const struct update_op *op,
869                                      const struct update_params *params,
870                                      struct thandle_exec_args *ta,
871                                      struct thandle *th)
872 {
873         struct update_thread_info *uti = update_env_info(env);
874         char            *buf;
875         char            *name;
876         int             fl;
877         __u16           size;
878         __u32           param_count;
879         int             rc;
880         ENTRY;
881
882         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
883         name = update_params_get_param_buf(params,
884                                            op->uop_params_off[0],
885                                            param_count, &size);
886         if (name == NULL)
887                 RETURN(-EIO);
888
889         buf = update_params_get_param_buf(params,
890                                           op->uop_params_off[1],
891                                           param_count, &size);
892         if (buf == NULL)
893                 RETURN(-EIO);
894
895         uti->uti_buf.lb_buf = buf;
896         uti->uti_buf.lb_len = (size_t)size;
897
898         buf = update_params_get_param_buf(params, op->uop_params_off[2],
899                                           param_count, &size);
900         if (buf == NULL)
901                 RETURN(-EIO);
902         if (size != sizeof(fl))
903                 RETURN(-EIO);
904
905         fl = le32_to_cpu(*(int *)buf);
906
907         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
908                               NULL, 0);
909
910         RETURN(rc);
911 }
912
913 static int update_recovery_index_insert(const struct lu_env *env,
914                                         struct dt_object *dt_obj,
915                                         const struct update_op *op,
916                                         const struct update_params *params,
917                                         struct thandle_exec_args *ta,
918                                         struct thandle *th)
919 {
920         struct update_thread_info *uti = update_env_info(env);
921         struct lu_fid           *fid;
922         char                    *name;
923         __u32                   param_count;
924         __u32                   *ptype;
925         __u32                   type;
926         __u16                   size;
927         int rc;
928         ENTRY;
929
930         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
931         name = update_params_get_param_buf(params, op->uop_params_off[0],
932                                            param_count, &size);
933         if (name == NULL)
934                 RETURN(-EIO);
935
936         fid = update_params_get_param_buf(params, op->uop_params_off[1],
937                                           param_count, &size);
938         if (fid == NULL)
939                 RETURN(-EIO);
940         if (size != sizeof(*fid))
941                 RETURN(-EIO);
942
943         fid_le_to_cpu(fid, fid);
944
945         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
946                                             param_count, &size);
947         if (ptype == NULL)
948                 RETURN(-EIO);
949         if (size != sizeof(*ptype))
950                 RETURN(-EIO);
951         type = le32_to_cpu(*ptype);
952
953         if (!dt_try_as_dir(env, dt_obj, false))
954                 RETURN(-ENOTDIR);
955
956         uti->uti_rec.rec_fid = fid;
957         uti->uti_rec.rec_type = type;
958
959         rc = out_tx_index_insert(env, dt_obj,
960                                  (const struct dt_rec *)&uti->uti_rec,
961                                  (const struct dt_key *)name, ta, th,
962                                  NULL, 0);
963
964         RETURN(rc);
965 }
966
967 static int update_recovery_index_delete(const struct lu_env *env,
968                                         struct dt_object *dt_obj,
969                                         const struct update_op *op,
970                                         const struct update_params *params,
971                                         struct thandle_exec_args *ta,
972                                         struct thandle *th)
973 {
974         struct update_thread_info *uti = update_env_info(env);
975         __u32   param_count;
976         char    *name;
977         __u16   size;
978         int     rc;
979         ENTRY;
980
981         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
982         name = update_params_get_param_buf(params, op->uop_params_off[0],
983                                            param_count, &size);
984         if (name == NULL)
985                 RETURN(-EIO);
986
987         if (!dt_try_as_dir(env, dt_obj, true))
988                 RETURN(-ENOTDIR);
989
990         rc = out_tx_index_delete(env, dt_obj,
991                                  (const struct dt_key *)name, ta, th, NULL, 0);
992
993         RETURN(rc);
994 }
995
996 static int update_recovery_write(const struct lu_env *env,
997                                  struct dt_object *dt_obj,
998                                  const struct update_op *op,
999                                  const struct update_params *params,
1000                                  struct thandle_exec_args *ta,
1001                                  struct thandle *th)
1002 {
1003         struct update_thread_info *uti = update_env_info(env);
1004         char            *buf;
1005         __u32           param_count;
1006         __u64           pos;
1007         __u16           size;
1008         int rc;
1009         ENTRY;
1010
1011         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1012         buf = update_params_get_param_buf(params, op->uop_params_off[0],
1013                                           param_count, &size);
1014         if (buf == NULL)
1015                 RETURN(-EIO);
1016
1017         uti->uti_buf.lb_buf = buf;
1018         uti->uti_buf.lb_len = size;
1019
1020         buf = update_params_get_param_buf(params, op->uop_params_off[1],
1021                                           param_count, &size);
1022         if (buf == NULL)
1023                 RETURN(-EIO);
1024
1025         pos = le64_to_cpu(*(__u64 *)buf);
1026
1027         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1028                           ta, th, NULL, 0);
1029
1030         RETURN(rc);
1031 }
1032
1033 static int update_recovery_xattr_del(const struct lu_env *env,
1034                                      struct dt_object *dt_obj,
1035                                      const struct update_op *op,
1036                                      const struct update_params *params,
1037                                      struct thandle_exec_args *ta,
1038                                      struct thandle *th)
1039 {
1040         struct update_thread_info *uti = update_env_info(env);
1041         __u32   param_count;
1042         char    *name;
1043         __u16   size;
1044         int     rc;
1045         ENTRY;
1046
1047         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1048         name = update_params_get_param_buf(params, op->uop_params_off[0],
1049                                            param_count, &size);
1050         if (name == NULL)
1051                 RETURN(-EIO);
1052
1053         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1054
1055         RETURN(rc);
1056 }
1057
1058 /**
1059  * Update session information
1060  *
1061  * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1062  * can be called correctly during update replay.
1063  *
1064  * \param[in] env       execution environment.
1065  * \param[in] tdtd      distribute data structure of the recovering tgt.
1066  * \param[in] th        thandle of this update replay.
1067  * \param[in] master_th master sub thandle.
1068  * \param[in] ta_arg    the tx arg structure to hold the update for updating
1069  *                      reply data.
1070  */
1071 static void update_recovery_update_ses(struct lu_env *env,
1072                                       struct target_distribute_txn_data *tdtd,
1073                                       struct thandle *th,
1074                                       struct thandle *master_th,
1075                                       struct distribute_txn_replay_req *dtrq,
1076                                       struct tx_arg *ta_arg)
1077 {
1078         struct tgt_session_info *tsi;
1079         struct lu_target        *lut = tdtd->tdtd_lut;
1080         struct obd_export       *export;
1081         struct cfs_hash         *hash;
1082         struct top_thandle      *top_th;
1083         struct lsd_reply_data   *lrd;
1084         size_t                  size;
1085
1086         tsi = tgt_ses_info(env);
1087         if (tsi->tsi_exp != NULL)
1088                 return;
1089
1090         size = ta_arg->u.write.buf.lb_len;
1091         lrd = ta_arg->u.write.buf.lb_buf;
1092         if (size != sizeof(*lrd) || lrd == NULL)
1093                 return;
1094
1095         lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
1096         lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
1097         lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
1098         lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
1099         lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
1100
1101         CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
1102         if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1103                 return;
1104
1105         hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1106         if (hash == NULL)
1107                 return;
1108
1109         export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1110         if (export == NULL) {
1111                 cfs_hash_putref(hash);
1112                 return;
1113         }
1114
1115         tsi->tsi_exp = export;
1116         tsi->tsi_xid = lrd->lrd_xid;
1117         tsi->tsi_opdata = lrd->lrd_data;
1118         tsi->tsi_result = lrd->lrd_result;
1119         tsi->tsi_client_gen = lrd->lrd_client_gen;
1120         dtrq->dtrq_xid = lrd->lrd_xid;
1121         top_th = container_of(th, struct top_thandle, tt_super);
1122         top_th->tt_master_sub_thandle = master_th;
1123         cfs_hash_putref(hash);
1124 }
1125
1126 /**
1127  * Execute updates in the update replay records
1128  *
1129  * Declare distribute txn replay by update records and add the updates
1130  * to the execution list. Note: it will check if the update has been
1131  * committed, and only execute the updates if it is not committed to
1132  * disk.
1133  *
1134  * \param[in] env       execution environment
1135  * \param[in] tdtd      distribute txn replay data which hold all of replay
1136  *                      reqs and all replay parameters.
1137  * \param[in] dtrq      distribute transaction replay req.
1138  * \param[in] ta        thandle execute args.
1139  *
1140  * \retval              0 if declare succeeds.
1141  * \retval              negative errno if declare fails.
1142  */
1143 static int update_recovery_exec(const struct lu_env *env,
1144                                 struct target_distribute_txn_data *tdtd,
1145                                 struct distribute_txn_replay_req *dtrq,
1146                                 struct thandle_exec_args *ta)
1147 {
1148         struct llog_update_record *lur = dtrq->dtrq_lur;
1149         struct update_records   *records = &lur->lur_update_rec;
1150         struct update_ops       *ops = &records->ur_ops;
1151         struct update_params    *params = update_records_get_params(records);
1152         struct top_thandle      *top_th = container_of(ta->ta_handle,
1153                                                        struct top_thandle,
1154                                                        tt_super);
1155         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1156         struct update_op        *op;
1157         unsigned int            i;
1158         int                     rc = 0;
1159         ENTRY;
1160
1161         /* These records have been swabbed in llog_cat_process() */
1162         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1163              i++, op = update_op_next_op(op)) {
1164                 struct lu_fid           *fid = &op->uop_fid;
1165                 struct dt_object        *dt_obj;
1166                 struct dt_object        *sub_dt_obj;
1167                 struct dt_device        *sub_dt;
1168                 struct sub_thandle      *st;
1169
1170                 if (op->uop_type == OUT_NOOP)
1171                         continue;
1172
1173                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1174                 if (IS_ERR(dt_obj)) {
1175                         rc = PTR_ERR(dt_obj);
1176                         if (rc == -EREMCHG)
1177                                 LCONSOLE_WARN("%.16s: hit invalid OI mapping "
1178                                               "for "DFID" during recovering, "
1179                                               "that may because auto scrub is "
1180                                               "disabled on related MDT, and "
1181                                               "will cause recovery failure. "
1182                                               "Please enable auto scrub and "
1183                                               "retry the recovery.\n",
1184                                               tdtd->tdtd_lut->lut_obd->obd_name,
1185                                               PFID(fid));
1186
1187                         break;
1188                 }
1189                 sub_dt_obj = dt_object_child(dt_obj);
1190
1191                 /* Create sub thandle if not */
1192                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1193                 st = lookup_sub_thandle(tmt, sub_dt);
1194                 if (st == NULL) {
1195                         st = create_sub_thandle(tmt, sub_dt);
1196                         if (IS_ERR(st))
1197                                 GOTO(next, rc = PTR_ERR(st));
1198                 }
1199
1200                 /* check if updates on the OSD/OSP are committed */
1201                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1202                 if (rc == 0)
1203                         /* If this is committed, goto next */
1204                         goto next;
1205
1206                 if (rc < 0)
1207                         GOTO(next, rc);
1208
1209                 /* Create thandle for sub thandle if needed */
1210                 if (st->st_sub_th == NULL) {
1211                         rc = sub_thandle_trans_create(env, top_th, st);
1212                         if (rc != 0)
1213                                 GOTO(next, rc);
1214                 }
1215
1216                 CDEBUG(D_HA, "replay %uth update\n", i);
1217                 switch (op->uop_type) {
1218                 case OUT_CREATE:
1219                         rc = update_recovery_create(env, sub_dt_obj,
1220                                                     op, params, ta,
1221                                                     st->st_sub_th);
1222                         break;
1223                 case OUT_DESTROY:
1224                         rc = update_recovery_destroy(env, sub_dt_obj,
1225                                                      op, params, ta,
1226                                                      st->st_sub_th);
1227                         break;
1228                 case OUT_REF_ADD:
1229                         rc = update_recovery_ref_add(env, sub_dt_obj,
1230                                                      op, params, ta,
1231                                                      st->st_sub_th);
1232                         break;
1233                 case OUT_REF_DEL:
1234                         rc = update_recovery_ref_del(env, sub_dt_obj,
1235                                                      op, params, ta,
1236                                                      st->st_sub_th);
1237                         break;
1238                 case OUT_ATTR_SET:
1239                         rc = update_recovery_attr_set(env, sub_dt_obj,
1240                                                       op, params, ta,
1241                                                       st->st_sub_th);
1242                         break;
1243                 case OUT_XATTR_SET:
1244                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1245                                                        op, params, ta,
1246                                                        st->st_sub_th);
1247                         break;
1248                 case OUT_INDEX_INSERT:
1249                         rc = update_recovery_index_insert(env, sub_dt_obj,
1250                                                           op, params, ta,
1251                                                           st->st_sub_th);
1252                         break;
1253                 case OUT_INDEX_DELETE:
1254                         rc = update_recovery_index_delete(env, sub_dt_obj,
1255                                                           op, params, ta,
1256                                                           st->st_sub_th);
1257                         break;
1258                 case OUT_WRITE:
1259                         rc = update_recovery_write(env, sub_dt_obj,
1260                                                    op, params, ta,
1261                                                    st->st_sub_th);
1262                         break;
1263                 case OUT_XATTR_DEL:
1264                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1265                                                        op, params, ta,
1266                                                        st->st_sub_th);
1267                         break;
1268                 default:
1269                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1270                         rc = -EINVAL;
1271                         break;
1272                 }
1273 next:
1274                 dt_object_put(env, dt_obj);
1275                 if (rc < 0)
1276                         break;
1277         }
1278
1279         ta->ta_handle->th_result = rc;
1280         RETURN(rc);
1281 }
1282
1283 /**
1284  * redo updates on MDT if needed.
1285  *
1286  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1287  * this function to replay distribute txn updates on all MDTs. It only replay
1288  * updates on the MDT where the update record is missing.
1289  *
1290  * If the update already exists on the MDT, then it does not need replay the
1291  * updates on that MDT, and only mark the sub transaction has been committed
1292  * there.
1293  *
1294  * \param[in] env       execution environment
1295  * \param[in] tdtd      target distribute txn data, which holds the replay list
1296  *                      and all parameters needed by replay process.
1297  * \param[in] dtrq      distribute txn replay req.
1298  *
1299  * \retval              0 if replay succeeds.
1300  * \retval              negative errno if replay failes.
1301  */
1302 int distribute_txn_replay_handle(struct lu_env *env,
1303                                  struct target_distribute_txn_data *tdtd,
1304                                  struct distribute_txn_replay_req *dtrq)
1305 {
1306         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1307         struct thandle_exec_args *ta;
1308         struct lu_context       session_env;
1309         struct thandle          *th = NULL;
1310         struct top_thandle      *top_th;
1311         struct top_multiple_thandle *tmt;
1312         struct thandle_update_records *tur = NULL;
1313         int                     i;
1314         int                     rc = 0;
1315         ENTRY;
1316
1317         /* initialize session, it is needed for the handler of target */
1318         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1319         if (rc) {
1320                 CERROR("%s: failure to initialize session: rc = %d\n",
1321                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1322                 RETURN(rc);
1323         }
1324         lu_context_enter(&session_env);
1325         env->le_ses = &session_env;
1326         lu_env_refill(env);
1327         update_records_dump(records, D_HA, true);
1328         th = top_trans_create(env, NULL);
1329         if (IS_ERR(th))
1330                 GOTO(exit_session, rc = PTR_ERR(th));
1331
1332         ta = &update_env_info(env)->uti_tea;
1333         ta->ta_argno = 0;
1334
1335         update_env_info(env)->uti_dtrq = dtrq;
1336         /* Create distribute transaction structure for this top thandle */
1337         top_th = container_of(th, struct top_thandle, tt_super);
1338         rc = top_trans_create_tmt(env, top_th);
1339         if (rc < 0)
1340                 GOTO(stop_trans, rc);
1341
1342         th->th_dev = tdtd->tdtd_dt;
1343         ta->ta_handle = th;
1344
1345         /* check if the distribute transaction has been committed */
1346         tmt = top_th->tt_multiple_thandle;
1347         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1348         tmt->tmt_batchid = dtrq->dtrq_batchid;
1349         tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1350
1351         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1352                 tmt->tmt_committed = 1;
1353
1354         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1355         if (rc < 0)
1356                 GOTO(stop_trans, rc);
1357
1358         /* If no updates are needed to be replayed, then mark this records as
1359          * committed, so commit thread distribute_txn_commit_thread() will
1360          * delete the record */
1361         if (ta->ta_argno == 0)
1362                 tmt->tmt_committed = 1;
1363
1364         tur = &update_env_info(env)->uti_tur;
1365         tur->tur_update_records = dtrq->dtrq_lur;
1366         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1367         tur->tur_update_params = NULL;
1368         tur->tur_update_param_count = 0;
1369         tmt->tmt_update_records = tur;
1370
1371         distribute_txn_insert_by_batchid(tmt);
1372         rc = top_trans_start(env, NULL, th);
1373         if (rc < 0)
1374                 GOTO(stop_trans, rc);
1375
1376         for (i = 0; i < ta->ta_argno; i++) {
1377                 struct tx_arg           *ta_arg;
1378                 struct dt_object        *dt_obj;
1379                 struct dt_device        *sub_dt;
1380                 struct sub_thandle      *st;
1381
1382                 ta_arg = ta->ta_args[i];
1383                 dt_obj = ta_arg->object;
1384
1385                 LASSERT(tmt->tmt_committed == 0);
1386                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1387                 st = lookup_sub_thandle(tmt, sub_dt);
1388
1389                 LASSERT(st != NULL);
1390                 LASSERT(st->st_sub_th != NULL);
1391                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1392                                              ta->ta_args[i]);
1393
1394                 /* If the update is to update the reply data, then
1395                  * we need set the session information, so
1396                  * tgt_last_rcvd_update() can be called correctly */
1397                 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1398                         update_recovery_update_ses(env, tdtd, th,
1399                                                    st->st_sub_th, dtrq, ta_arg);
1400
1401                 if (unlikely(rc < 0)) {
1402                         CDEBUG(D_HA, "error during execution of #%u from"
1403                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1404                                ta->ta_args[i]->line, rc);
1405                         while (--i > 0) {
1406                                 if (ta->ta_args[i]->undo_fn != NULL) {
1407                                         dt_obj = ta->ta_args[i]->object;
1408                                         sub_dt =
1409                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1410                                         st = lookup_sub_thandle(tmt, sub_dt);
1411                                         LASSERT(st != NULL);
1412                                         LASSERT(st->st_sub_th != NULL);
1413
1414                                         ta->ta_args[i]->undo_fn(env,
1415                                                                st->st_sub_th,
1416                                                                ta->ta_args[i]);
1417                                 } else {
1418                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1419                                              dt_obd_name(ta->ta_handle->th_dev),
1420                                                ta->ta_args[i]->file,
1421                                                ta->ta_args[i]->line, -ENOTSUPP);
1422                                 }
1423                         }
1424                         break;
1425                 }
1426                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1427                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1428         }
1429
1430 stop_trans:
1431         if (rc < 0)
1432                 th->th_result = rc;
1433         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1434         for (i = 0; i < ta->ta_argno; i++) {
1435                 if (ta->ta_args[i]->object != NULL) {
1436                         dt_object_put(env, ta->ta_args[i]->object);
1437                         ta->ta_args[i]->object = NULL;
1438                 }
1439         }
1440
1441         if (tur != NULL)
1442                 tur->tur_update_records = NULL;
1443
1444         if (tgt_ses_info(env)->tsi_exp != NULL) {
1445                 class_export_put(tgt_ses_info(env)->tsi_exp);
1446                 tgt_ses_info(env)->tsi_exp = NULL;
1447         }
1448 exit_session:
1449         lu_context_exit(&session_env);
1450         lu_context_fini(&session_env);
1451         RETURN(rc);
1452 }
1453 EXPORT_SYMBOL(distribute_txn_replay_handle);