Whamcloud - gitweb
LU-7117 osp: set ptlrpc_request::rq_allow_replay properly
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2015, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <lustre_obdo.h>
48 #include <lustre_update.h>
49 #include <lustre_swab.h>
50 #include <md_object.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "tgt_internal.h"
55
56 /**
57  * Lookup distribute_txn_replay req
58  *
59  * Lookup distribute_txn_replay in the replay list by batchid.
60  * It is assumed the list has been locked before calling this function.
61  *
62  * \param[in] tdtd      distribute_txn_data, which holds the replay
63  *                      list.
64  * \param[in] batchid   batchid used by lookup.
65  *
66  * \retval              pointer of the replay if succeeds.
67  * \retval              NULL if can not find it.
68  */
69 static struct distribute_txn_replay_req *
70 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
71 {
72         struct distribute_txn_replay_req        *tmp;
73         struct distribute_txn_replay_req        *dtrq = NULL;
74
75         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
76                 if (tmp->dtrq_batchid == batchid) {
77                         dtrq = tmp;
78                         break;
79                 }
80         }
81         return dtrq;
82 }
83
84 /**
85  * insert distribute txn replay req
86  *
87  * Insert distribute txn replay to the replay list, and it assumes the
88  * list has been looked. Note: the replay list is a sorted list, which
89  * is sorted by master transno. It is assumed the replay list has been
90  * locked before calling this function.
91  *
92  * \param[in] tdtd      target distribute txn data where replay list is
93  * \param[in] new       distribute txn replay to be inserted
94  *
95  * \retval              0 if insertion succeeds
96  * \retval              EEXIST if the dtrq already exists
97  */
98 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
99                         struct distribute_txn_replay_req *new)
100 {
101         struct distribute_txn_replay_req *iter;
102
103         /* Check if the dtrq has been added to the list */
104         iter = dtrq_lookup(tdtd, new->dtrq_batchid);
105         if (iter != NULL)
106                 return -EEXIST;
107
108         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
109                 if (iter->dtrq_master_transno > new->dtrq_master_transno)
110                         continue;
111
112                 /* If there are mulitple replay req with same transno, then
113                  * sort them with batchid */
114                 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
115                     iter->dtrq_batchid > new->dtrq_batchid)
116                         continue;
117
118                 list_add(&new->dtrq_list, &iter->dtrq_list);
119                 break;
120         }
121
122         if (list_empty(&new->dtrq_list))
123                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
124
125         return 0;
126 }
127
128 /**
129  * create distribute txn replay req
130  *
131  * Allocate distribute txn replay req according to the update records.
132  *
133  * \param[in] tdtd      target distribute txn data where replay list is.
134  * \param[in] record    update records from the update log.
135  *
136  * \retval              the pointer of distribute txn replay req if
137  *                      the creation succeeds.
138  * \retval              NULL if the creation fails.
139  */
140 static struct distribute_txn_replay_req *
141 dtrq_create(struct target_distribute_txn_data *tdtd,
142             struct llog_update_record *lur)
143 {
144         struct distribute_txn_replay_req *new;
145
146         OBD_ALLOC_PTR(new);
147         if (new == NULL)
148                 RETURN(ERR_PTR(-ENOMEM));
149
150         new->dtrq_lur_size = llog_update_record_size(lur);
151         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
152         if (new->dtrq_lur == NULL) {
153                 OBD_FREE_PTR(new);
154                 RETURN(ERR_PTR(-ENOMEM));
155         }
156
157         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
158
159         /* If the transno in the update record is 0, it means the
160          * update are from master MDT, and it will use the master
161          * last committed transno as its master transno. Later, if
162          * the update records are gotten from slave MDTs, then these
163          * transno will be replaced.
164          * See insert_update_records_to_replay_list(). */
165         if (lur->lur_update_rec.ur_master_transno == 0) {
166                 new->dtrq_lur->lur_update_rec.ur_master_transno =
167                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
168                 new->dtrq_master_transno =
169                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
170         } else {
171                 new->dtrq_master_transno =
172                                 lur->lur_update_rec.ur_master_transno;
173         }
174
175         new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
176
177         spin_lock_init(&new->dtrq_sub_list_lock);
178         INIT_LIST_HEAD(&new->dtrq_sub_list);
179         INIT_LIST_HEAD(&new->dtrq_list);
180
181         RETURN(new);
182 }
183
184 /**
185  * Lookup distribute sub replay
186  *
187  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
188  * mdt_index.
189  *
190  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
191  * \param[in] mdt_index                 the mdt_index as the key of lookup
192  *
193  * \retval              the pointer of sub replay if it can be found.
194  * \retval              NULL if it can not find.
195  */
196 struct distribute_txn_replay_req_sub *
197 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
198 {
199         struct distribute_txn_replay_req_sub *dtrqs = NULL;
200         struct distribute_txn_replay_req_sub *tmp;
201
202         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
203                 if (tmp->dtrqs_mdt_index == mdt_index) {
204                         dtrqs = tmp;
205                         break;
206                 }
207         }
208         return dtrqs;
209 }
210
211 /**
212  * Try to add cookie to sub distribute txn request
213  *
214  * Check if the update log cookie has been added to the request, if not,
215  * add it to the dtrqs_cookie_list.
216  *
217  * \param[in] dtrqs     sub replay req where cookies to be added.
218  * \param[in] cookie    cookie to be added.
219  *
220  * \retval              0 if the cookie is adding succeeds.
221  * \retval              negative errno if adding fails.
222  */
223 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
224                                struct llog_cookie *cookie)
225 {
226         struct sub_thandle_cookie *new;
227
228         OBD_ALLOC_PTR(new);
229         if (new == NULL)
230                 return -ENOMEM;
231
232         INIT_LIST_HEAD(&new->stc_list);
233         new->stc_cookie = *cookie;
234         /* Note: only single thread will access one sub_request each time,
235          * so no need lock here */
236         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
237
238         return 0;
239 }
240
241 /**
242  * Insert distribute txn sub req replay
243  *
244  * Allocate sub replay req and insert distribute txn replay list.
245  *
246  * \param[in] dtrq      d to be added
247  * \param[in] cookie    the cookie of the update record
248  * \param[in] mdt_index the mdt_index of the update record
249  *
250  * \retval              0 if the adding succeeds.
251  * \retval              negative errno if the adding fails.
252  */
253 static int
254 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
255                            struct llog_cookie *cookie,
256                            __u32 mdt_index)
257 {
258         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
259         struct distribute_txn_replay_req_sub    *new;
260         int                                     rc;
261         ENTRY;
262
263         spin_lock(&dtrq->dtrq_sub_list_lock);
264         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
265         spin_unlock(&dtrq->dtrq_sub_list_lock);
266         if (dtrqs != NULL) {
267                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
268                 RETURN(0);
269         }
270
271         OBD_ALLOC_PTR(new);
272         if (new == NULL)
273                 RETURN(-ENOMEM);
274
275         INIT_LIST_HEAD(&new->dtrqs_list);
276         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
277         new->dtrqs_mdt_index = mdt_index;
278         spin_lock(&dtrq->dtrq_sub_list_lock);
279         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
280         if (dtrqs == NULL) {
281                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
282                 dtrqs = new;
283         } else {
284                 OBD_FREE_PTR(new);
285         }
286         spin_unlock(&dtrq->dtrq_sub_list_lock);
287
288         rc = dtrq_sub_add_cookie(dtrqs, cookie);
289
290         RETURN(rc);
291 }
292
293 /**
294  * append updates to the current replay updates
295  *
296  * Append more updates to the existent replay update. And this is only
297  * used when combining mulitple updates into one large updates during
298  * replay.
299  *
300  * \param[in] dtrq      the update replay request where the new update
301  *                      records will be added.
302  * \param[in] lur       the new update record.
303  *
304  * \retval              0 if appending succeeds.
305  * \retval              negative errno if appending fails.
306  */
307 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
308                                struct update_records *record)
309 {
310         struct llog_update_record *new_lur;
311         size_t lur_size = dtrq->dtrq_lur_size;
312         void *ptr;
313         ENTRY;
314
315         /* Because several threads might retrieve the same records from
316          * different targets, and we only need one copy of records. So
317          * we will check if the records is in the next one, if not, just
318          * skip it */
319         spin_lock(&dtrq->dtrq_sub_list_lock);
320         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
321                 spin_unlock(&dtrq->dtrq_sub_list_lock);
322                 RETURN(0);
323         }
324         dtrq->dtrq_lur->lur_update_rec.ur_index++;
325         spin_unlock(&dtrq->dtrq_sub_list_lock);
326
327         lur_size += update_records_size(record);
328         OBD_ALLOC_LARGE(new_lur, lur_size);
329         if (new_lur == NULL) {
330                 spin_lock(&dtrq->dtrq_sub_list_lock);
331                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
332                 spin_unlock(&dtrq->dtrq_sub_list_lock);
333                 RETURN(-ENOMEM);
334         }
335
336         /* Copy the old and new records to the new allocated buffer */
337         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
338         ptr = (char *)&new_lur->lur_update_rec +
339                 update_records_size(&new_lur->lur_update_rec);
340         memcpy(ptr, &record->ur_ops,
341                update_records_size(record) -
342                offsetof(struct update_records, ur_ops));
343
344         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
345         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
346         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
347
348         /* Replace the records */
349         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
350         dtrq->dtrq_lur = new_lur;
351         dtrq->dtrq_lur_size = lur_size;
352         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
353         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
354         RETURN(0);
355 }
356
357 /**
358  * Insert update records to the replay list.
359  *
360  * Allocate distribute txn replay req and insert it into the replay
361  * list, then insert the update records into the replay req.
362  *
363  * \param[in] tdtd      distribute txn replay data where the replay list
364  *                      is.
365  * \param[in] record    the update record
366  * \param[in] cookie    cookie of the record
367  * \param[in] index     mdt index of the record
368  *
369  * \retval              0 if the adding succeeds.
370  * \retval              negative errno if the adding fails.
371  */
372 int
373 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
374                                      struct llog_update_record *lur,
375                                      struct llog_cookie *cookie,
376                                      __u32 mdt_index)
377 {
378         struct distribute_txn_replay_req *dtrq;
379         struct update_records *record = &lur->lur_update_rec;
380         bool replace_record = false;
381         int rc = 0;
382         ENTRY;
383
384         CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
385                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
386                record->ur_batchid, record->ur_master_transno, mdt_index);
387
388         /* Update batchid if necessary */
389         spin_lock(&tdtd->tdtd_batchid_lock);
390         if (record->ur_batchid >= tdtd->tdtd_batchid) {
391                 CDEBUG(D_HA, "%s update batchid from "LPU64 " to "LPU64"\n",
392                        tdtd->tdtd_lut->lut_obd->obd_name,
393                        tdtd->tdtd_batchid, record->ur_batchid);
394                 tdtd->tdtd_batchid = record->ur_batchid + 1;
395         }
396         spin_unlock(&tdtd->tdtd_batchid_lock);
397
398 again:
399         spin_lock(&tdtd->tdtd_replay_list_lock);
400         /* First try to build the replay update request with the records */
401         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
402         if (dtrq == NULL) {
403                 spin_unlock(&tdtd->tdtd_replay_list_lock);
404                 dtrq = dtrq_create(tdtd, lur);
405                 if (IS_ERR(dtrq))
406                         RETURN(PTR_ERR(dtrq));
407
408                 spin_lock(&tdtd->tdtd_replay_list_lock);
409                 rc = dtrq_insert(tdtd, dtrq);
410                 if (rc < 0) {
411                         spin_unlock(&tdtd->tdtd_replay_list_lock);
412                         dtrq_destroy(dtrq);
413                         if (rc == -EEXIST)
414                                 goto again;
415                         return rc;
416                 }
417         } else {
418                 /* If the master transno in update header is not
419                 * matched with the one in the record, then it means
420                 * the dtrq is originally created by master record,
421                 * so we need update master transno and reposition
422                 * the dtrq(by master transno) in the list and also
423                 * replace update record */
424                 if (record->ur_master_transno != 0 &&
425                     dtrq->dtrq_master_transno != record->ur_master_transno &&
426                     dtrq->dtrq_lur != NULL) {
427                         list_del_init(&dtrq->dtrq_list);
428                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
429                                                 record->ur_master_transno;
430
431                         dtrq->dtrq_master_transno = record->ur_master_transno;
432                         replace_record = true;
433                         /* try to insert again */
434                         rc = dtrq_insert(tdtd, dtrq);
435                         if (rc < 0) {
436                                 spin_unlock(&tdtd->tdtd_replay_list_lock);
437                                 dtrq_destroy(dtrq);
438                                 return rc;
439                         }
440                 }
441         }
442         spin_unlock(&tdtd->tdtd_replay_list_lock);
443
444         /* Because there should be only thread access the update record, so
445          * we do not need lock here */
446         if (replace_record) {
447                 /* Replace the update record and master transno */
448                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
449                 dtrq->dtrq_lur = NULL;
450                 dtrq->dtrq_lur_size = llog_update_record_size(lur);
451                 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
452                 if (dtrq->dtrq_lur == NULL)
453                         return -ENOMEM;
454
455                 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
456         }
457
458         /* This is a partial update records, let's try to append
459          * the record to the current replay request */
460         if (record->ur_flags & UPDATE_RECORD_CONTINUE)
461                 rc = dtrq_append_updates(dtrq, record);
462
463         /* Then create and add sub update request */
464         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
465
466         RETURN(rc);
467 }
468 EXPORT_SYMBOL(insert_update_records_to_replay_list);
469
470 /**
471  * Dump updates of distribute txns.
472  *
473  * Output all of recovery updates in the distribute txn list to the
474  * debug log.
475  *
476  * \param[in] tdtd      distribute txn data where all of distribute txn
477  *                      are listed.
478  * \param[in] mask      debug mask
479  */
480 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
481 {
482         struct distribute_txn_replay_req *dtrq;
483
484         spin_lock(&tdtd->tdtd_replay_list_lock);
485         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
486                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
487                                     false);
488         spin_unlock(&tdtd->tdtd_replay_list_lock);
489 }
490 EXPORT_SYMBOL(dtrq_list_dump);
491
492 /**
493  * Destroy distribute txn replay req
494  *
495  * Destroy distribute txn replay req and all of subs.
496  *
497  * \param[in] dtrq      distribute txn replqy req to be destroyed.
498  */
499 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
500 {
501         struct distribute_txn_replay_req_sub    *dtrqs;
502         struct distribute_txn_replay_req_sub    *tmp;
503
504         LASSERT(list_empty(&dtrq->dtrq_list));
505         spin_lock(&dtrq->dtrq_sub_list_lock);
506         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
507                 struct sub_thandle_cookie *stc;
508                 struct sub_thandle_cookie *tmp;
509
510                 list_del(&dtrqs->dtrqs_list);
511                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
512                                          stc_list) {
513                         list_del(&stc->stc_list);
514                         OBD_FREE_PTR(stc);
515                 }
516                 OBD_FREE_PTR(dtrqs);
517         }
518         spin_unlock(&dtrq->dtrq_sub_list_lock);
519
520         if (dtrq->dtrq_lur != NULL)
521                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
522
523         OBD_FREE_PTR(dtrq);
524 }
525 EXPORT_SYMBOL(dtrq_destroy);
526
527 /**
528  * Destroy all of replay req.
529  *
530  * Destroy all of replay req in the replay list.
531  *
532  * \param[in] tdtd      target distribute txn data where the replay list is.
533  */
534 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
535 {
536         struct distribute_txn_replay_req *dtrq;
537         struct distribute_txn_replay_req *tmp;
538
539         spin_lock(&tdtd->tdtd_replay_list_lock);
540         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
541                                  dtrq_list) {
542                 list_del_init(&dtrq->dtrq_list);
543                 dtrq_destroy(dtrq);
544         }
545         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
546                                  dtrq_list) {
547                 list_del_init(&dtrq->dtrq_list);
548                 dtrq_destroy(dtrq);
549         }
550         spin_unlock(&tdtd->tdtd_replay_list_lock);
551 }
552 EXPORT_SYMBOL(dtrq_list_destroy);
553
554 /**
555  * Get next req in the replay list
556  *
557  * Get next req needs to be replayed, since it is a sorted list
558  * (by master MDT transno)
559  *
560  * \param[in] tdtd      distribute txn data where the replay list is
561  *
562  * \retval              the pointer of update recovery header
563  */
564 struct distribute_txn_replay_req *
565 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
566 {
567         struct distribute_txn_replay_req *dtrq = NULL;
568
569         spin_lock(&tdtd->tdtd_replay_list_lock);
570         if (!list_empty(&tdtd->tdtd_replay_list)) {
571                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
572                                  struct distribute_txn_replay_req, dtrq_list);
573                 list_del_init(&dtrq->dtrq_list);
574         }
575         spin_unlock(&tdtd->tdtd_replay_list_lock);
576
577         return dtrq;
578 }
579 EXPORT_SYMBOL(distribute_txn_get_next_req);
580
581 /**
582  * Get next transno in the replay list, because this is the sorted
583  * list, so it will return the transno of next req in the list.
584  *
585  * \param[in] tdtd      distribute txn data where the replay list is
586  *
587  * \retval              the transno of next update in the list
588  */
589 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
590 {
591         struct distribute_txn_replay_req        *dtrq = NULL;
592         __u64                                   transno = 0;
593
594         spin_lock(&tdtd->tdtd_replay_list_lock);
595         if (!list_empty(&tdtd->tdtd_replay_list)) {
596                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
597                                  struct distribute_txn_replay_req, dtrq_list);
598                 transno = dtrq->dtrq_master_transno;
599         }
600         spin_unlock(&tdtd->tdtd_replay_list_lock);
601
602         CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
603                tdtd->tdtd_lut->lut_obd->obd_name, transno);
604         return transno;
605 }
606 EXPORT_SYMBOL(distribute_txn_get_next_transno);
607
608 struct distribute_txn_replay_req *
609 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
610                                   __u64 xid)
611 {
612         struct distribute_txn_replay_req *dtrq = NULL;
613         struct distribute_txn_replay_req *iter;
614
615         spin_lock(&tdtd->tdtd_replay_list_lock);
616         list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
617                 if (iter->dtrq_xid == xid) {
618                         dtrq = iter;
619                         break;
620                 }
621         }
622         spin_unlock(&tdtd->tdtd_replay_list_lock);
623         return dtrq;
624 }
625
626 bool is_req_replayed_by_update(struct ptlrpc_request *req)
627 {
628         struct lu_target *tgt = class_exp2tgt(req->rq_export);
629         struct distribute_txn_replay_req *dtrq;
630
631         if (tgt->lut_tdtd == NULL)
632                 return false;
633
634         dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
635         if (dtrq == NULL)
636                 return false;
637
638         return true;
639 }
640 EXPORT_SYMBOL(is_req_replayed_by_update);
641
642 /**
643  * Check if the update of one object is committed
644  *
645  * Check whether the update for the object is committed by checking whether
646  * the correspondent sub exists in the replay req. If it is committed, mark
647  * the committed flag in correspondent the sub thandle.
648  *
649  * \param[in] env       execution environment
650  * \param[in] dtrq      replay request
651  * \param[in] dt_obj    object for the update
652  * \param[in] top_th    top thandle
653  * \param[in] sub_th    sub thandle which the update belongs to
654  *
655  * \retval              1 if the update is not committed.
656  * \retval              0 if the update is committed.
657  * \retval              negative errno if some other failures happen.
658  */
659 static int update_is_committed(const struct lu_env *env,
660                                struct distribute_txn_replay_req *dtrq,
661                                struct dt_object *dt_obj,
662                                struct top_thandle *top_th,
663                                struct sub_thandle *st)
664 {
665         struct seq_server_site  *seq_site;
666         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
667         struct distribute_txn_replay_req_sub    *dtrqs;
668         __u32                   mdt_index;
669         ENTRY;
670
671         if (st->st_sub_th != NULL)
672                 RETURN(1);
673
674         if (st->st_committed)
675                 RETURN(0);
676
677         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
678         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
679                 mdt_index = fid_oid(fid);
680         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
681                 mdt_index = seq_site->ss_node_id;
682         } else {
683                 struct lu_server_fld *fld;
684                 struct lu_seq_range range = {0};
685                 int rc;
686
687                 fld = seq_site->ss_server_fld;
688                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
689                 LASSERT(fld->lsf_seq_lookup != NULL);
690                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
691                                          &range);
692                 if (rc < 0)
693                         RETURN(rc);
694                 mdt_index = range.lsr_index;
695         }
696
697         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
698         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
699                 st->st_committed = 1;
700                 if (dtrqs != NULL) {
701                         struct sub_thandle_cookie *stc;
702                         struct sub_thandle_cookie *tmp;
703
704                         list_for_each_entry_safe(stc, tmp,
705                                                  &dtrqs->dtrqs_cookie_list,
706                                                  stc_list)
707                                 list_move(&stc->stc_list, &st->st_cookie_list);
708                 }
709                 RETURN(0);
710         }
711
712         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
713                mdt_index);
714
715         RETURN(1);
716 }
717
718 /**
719  * Implementation of different update methods for update recovery.
720  *
721  * These following functions update_recovery_$(update_name) implement
722  * different updates recovery methods. They will extract the parameters
723  * from the common parameters area and call correspondent dt API to redo
724  * the update.
725  *
726  * \param[in] env       execution environment
727  * \param[in] op        update operation to be replayed
728  * \param[in] params    common update parameters which holds all parameters
729  *                      of the operation
730  * \param[in] th        transaction handle
731  * \param[in] declare   indicate it will do declare or real execution, true
732  *                      means declare, false means real execution
733  *
734  * \retval              0 if it succeeds.
735  * \retval              negative errno if it fails.
736  */
737 static int update_recovery_create(const struct lu_env *env,
738                                   struct dt_object *dt_obj,
739                                   const struct update_op *op,
740                                   const struct update_params *params,
741                                   struct thandle_exec_args *ta,
742                                   struct thandle *th)
743 {
744         struct update_thread_info *uti = update_env_info(env);
745         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
746         struct lu_attr          *attr = &uti->uti_attr;
747         struct obdo             *wobdo;
748         struct obdo             *lobdo = &uti->uti_obdo;
749         struct dt_object_format dof;
750         __u16                   size;
751         unsigned int            param_count;
752         int rc;
753         ENTRY;
754
755         if (dt_object_exists(dt_obj))
756                 RETURN(-EEXIST);
757
758         param_count = lur->lur_update_rec.ur_param_count;
759         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
760                                             param_count, &size);
761         if (wobdo == NULL)
762                 RETURN(-EIO);
763         if (size != sizeof(*wobdo))
764                 RETURN(-EIO);
765
766         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
767                 lustre_swab_obdo(wobdo);
768
769         lustre_get_wire_obdo(NULL, lobdo, wobdo);
770         la_from_obdo(attr, lobdo, lobdo->o_valid);
771
772         dof.dof_type = dt_mode_to_dft(attr->la_mode);
773
774         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
775                            ta, th, NULL, 0);
776
777         RETURN(rc);
778 }
779
780 static int update_recovery_destroy(const struct lu_env *env,
781                                    struct dt_object *dt_obj,
782                                    const struct update_op *op,
783                                    const struct update_params *params,
784                                    struct thandle_exec_args *ta,
785                                    struct thandle *th)
786 {
787         int rc;
788         ENTRY;
789
790         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
791
792         RETURN(rc);
793 }
794
795 static int update_recovery_ref_add(const struct lu_env *env,
796                                    struct dt_object *dt_obj,
797                                    const struct update_op *op,
798                                    const struct update_params *params,
799                                    struct thandle_exec_args *ta,
800                                    struct thandle *th)
801 {
802         int rc;
803         ENTRY;
804
805         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
806
807         RETURN(rc);
808 }
809
810 static int update_recovery_ref_del(const struct lu_env *env,
811                                    struct dt_object *dt_obj,
812                                    const struct update_op *op,
813                                    const struct update_params *params,
814                                    struct thandle_exec_args *ta,
815                                    struct thandle *th)
816 {
817         int rc;
818         ENTRY;
819
820         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
821
822         RETURN(rc);
823 }
824
825 static int update_recovery_attr_set(const struct lu_env *env,
826                                     struct dt_object *dt_obj,
827                                     const struct update_op *op,
828                                     const struct update_params *params,
829                                     struct thandle_exec_args *ta,
830                                     struct thandle *th)
831 {
832         struct update_thread_info *uti = update_env_info(env);
833         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
834         struct obdo     *wobdo;
835         struct obdo     *lobdo = &uti->uti_obdo;
836         struct lu_attr  *attr = &uti->uti_attr;
837         __u16           size;
838         unsigned int    param_count;
839         int             rc;
840         ENTRY;
841
842         param_count = lur->lur_update_rec.ur_param_count;
843         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
844                                             param_count, &size);
845         if (wobdo == NULL)
846                 RETURN(-EIO);
847         if (size != sizeof(*wobdo))
848                 RETURN(-EIO);
849
850         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
851                 lustre_swab_obdo(wobdo);
852
853         lustre_get_wire_obdo(NULL, lobdo, wobdo);
854         la_from_obdo(attr, lobdo, lobdo->o_valid);
855
856         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
857
858         RETURN(rc);
859 }
860
861 static int update_recovery_xattr_set(const struct lu_env *env,
862                                      struct dt_object *dt_obj,
863                                      const struct update_op *op,
864                                      const struct update_params *params,
865                                      struct thandle_exec_args *ta,
866                                      struct thandle *th)
867 {
868         struct update_thread_info *uti = update_env_info(env);
869         char            *buf;
870         char            *name;
871         int             fl;
872         __u16           size;
873         __u32           param_count;
874         int             rc;
875         ENTRY;
876
877         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
878         name = update_params_get_param_buf(params,
879                                            op->uop_params_off[0],
880                                            param_count, &size);
881         if (name == NULL)
882                 RETURN(-EIO);
883
884         buf = update_params_get_param_buf(params,
885                                           op->uop_params_off[1],
886                                           param_count, &size);
887         if (buf == NULL)
888                 RETURN(-EIO);
889
890         uti->uti_buf.lb_buf = buf;
891         uti->uti_buf.lb_len = (size_t)size;
892
893         buf = update_params_get_param_buf(params, op->uop_params_off[2],
894                                           param_count, &size);
895         if (buf == NULL)
896                 RETURN(-EIO);
897         if (size != sizeof(fl))
898                 RETURN(-EIO);
899
900         fl = le32_to_cpu(*(int *)buf);
901
902         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
903                               NULL, 0);
904
905         RETURN(rc);
906 }
907
908 static int update_recovery_index_insert(const struct lu_env *env,
909                                         struct dt_object *dt_obj,
910                                         const struct update_op *op,
911                                         const struct update_params *params,
912                                         struct thandle_exec_args *ta,
913                                         struct thandle *th)
914 {
915         struct update_thread_info *uti = update_env_info(env);
916         struct lu_fid           *fid;
917         char                    *name;
918         __u32                   param_count;
919         __u32                   *ptype;
920         __u32                   type;
921         __u16                   size;
922         int rc;
923         ENTRY;
924
925         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
926         name = update_params_get_param_buf(params, op->uop_params_off[0],
927                                            param_count, &size);
928         if (name == NULL)
929                 RETURN(-EIO);
930
931         fid = update_params_get_param_buf(params, op->uop_params_off[1],
932                                           param_count, &size);
933         if (fid == NULL)
934                 RETURN(-EIO);
935         if (size != sizeof(*fid))
936                 RETURN(-EIO);
937
938         fid_le_to_cpu(fid, fid);
939
940         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
941                                             param_count, &size);
942         if (ptype == NULL)
943                 RETURN(-EIO);
944         if (size != sizeof(*ptype))
945                 RETURN(-EIO);
946         type = le32_to_cpu(*ptype);
947
948         if (dt_try_as_dir(env, dt_obj) == 0)
949                 RETURN(-ENOTDIR);
950
951         uti->uti_rec.rec_fid = fid;
952         uti->uti_rec.rec_type = type;
953
954         rc = out_tx_index_insert(env, dt_obj,
955                                  (const struct dt_rec *)&uti->uti_rec,
956                                  (const struct dt_key *)name, ta, th,
957                                  NULL, 0);
958
959         RETURN(rc);
960 }
961
962 static int update_recovery_index_delete(const struct lu_env *env,
963                                         struct dt_object *dt_obj,
964                                         const struct update_op *op,
965                                         const struct update_params *params,
966                                         struct thandle_exec_args *ta,
967                                         struct thandle *th)
968 {
969         struct update_thread_info *uti = update_env_info(env);
970         __u32   param_count;
971         char    *name;
972         __u16   size;
973         int     rc;
974         ENTRY;
975
976         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
977         name = update_params_get_param_buf(params, op->uop_params_off[0],
978                                            param_count, &size);
979         if (name == NULL)
980                 RETURN(-EIO);
981
982         if (dt_try_as_dir(env, dt_obj) == 0)
983                 RETURN(-ENOTDIR);
984
985         rc = out_tx_index_delete(env, dt_obj,
986                                  (const struct dt_key *)name, ta, th, NULL, 0);
987
988         RETURN(rc);
989 }
990
991 static int update_recovery_write(const struct lu_env *env,
992                                  struct dt_object *dt_obj,
993                                  const struct update_op *op,
994                                  const struct update_params *params,
995                                  struct thandle_exec_args *ta,
996                                  struct thandle *th)
997 {
998         struct update_thread_info *uti = update_env_info(env);
999         char            *buf;
1000         __u32           param_count;
1001         __u64           pos;
1002         __u16           size;
1003         int rc;
1004         ENTRY;
1005
1006         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1007         buf = update_params_get_param_buf(params, op->uop_params_off[0],
1008                                           param_count, &size);
1009         if (buf == NULL)
1010                 RETURN(-EIO);
1011
1012         uti->uti_buf.lb_buf = buf;
1013         uti->uti_buf.lb_len = size;
1014
1015         buf = update_params_get_param_buf(params, op->uop_params_off[1],
1016                                           param_count, &size);
1017         if (buf == NULL)
1018                 RETURN(-EIO);
1019
1020         pos = le64_to_cpu(*(__u64 *)buf);
1021
1022         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1023                           ta, th, NULL, 0);
1024
1025         RETURN(rc);
1026 }
1027
1028 static int update_recovery_xattr_del(const struct lu_env *env,
1029                                      struct dt_object *dt_obj,
1030                                      const struct update_op *op,
1031                                      const struct update_params *params,
1032                                      struct thandle_exec_args *ta,
1033                                      struct thandle *th)
1034 {
1035         struct update_thread_info *uti = update_env_info(env);
1036         __u32   param_count;
1037         char    *name;
1038         __u16   size;
1039         int     rc;
1040         ENTRY;
1041
1042         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1043         name = update_params_get_param_buf(params, op->uop_params_off[0],
1044                                            param_count, &size);
1045         if (name == NULL)
1046                 RETURN(-EIO);
1047
1048         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1049
1050         RETURN(rc);
1051 }
1052
1053 /**
1054  * Update session information
1055  *
1056  * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1057  * can be called correctly during update replay.
1058  *
1059  * \param[in] env       execution environment.
1060  * \param[in] tdtd      distribute data structure of the recovering tgt.
1061  * \param[in] th        thandle of this update replay.
1062  * \param[in] master_th master sub thandle.
1063  * \param[in] ta_arg    the tx arg structure to hold the update for updating
1064  *                      reply data.
1065  */
1066 static void update_recovery_update_ses(struct lu_env *env,
1067                                       struct target_distribute_txn_data *tdtd,
1068                                       struct thandle *th,
1069                                       struct thandle *master_th,
1070                                       struct distribute_txn_replay_req *dtrq,
1071                                       struct tx_arg *ta_arg)
1072 {
1073         struct tgt_session_info *tsi;
1074         struct lu_target        *lut = tdtd->tdtd_lut;
1075         struct obd_export       *export;
1076         struct cfs_hash         *hash;
1077         struct top_thandle      *top_th;
1078         struct lsd_reply_data   *lrd;
1079         size_t                  size;
1080
1081         tsi = tgt_ses_info(env);
1082         if (tsi->tsi_exp != NULL)
1083                 return;
1084
1085         size = ta_arg->u.write.buf.lb_len;
1086         lrd = ta_arg->u.write.buf.lb_buf;
1087         if (size != sizeof(*lrd) || lrd == NULL)
1088                 return;
1089
1090         lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
1091         lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
1092         lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
1093         lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
1094         lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
1095
1096         if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1097                 return;
1098
1099         hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1100         if (hash == NULL)
1101                 return;
1102
1103         export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1104         if (export == NULL) {
1105                 cfs_hash_putref(hash);
1106                 return;
1107         }
1108
1109         tsi->tsi_exp = export;
1110         tsi->tsi_xid = lrd->lrd_xid;
1111         tsi->tsi_opdata = lrd->lrd_data;
1112         tsi->tsi_result = lrd->lrd_result;
1113         tsi->tsi_client_gen = lrd->lrd_client_gen;
1114         dtrq->dtrq_xid = lrd->lrd_xid;
1115         top_th = container_of(th, struct top_thandle, tt_super);
1116         top_th->tt_master_sub_thandle = master_th;
1117         cfs_hash_putref(hash);
1118 }
1119
1120 /**
1121  * Execute updates in the update replay records
1122  *
1123  * Declare distribute txn replay by update records and add the updates
1124  * to the execution list. Note: it will check if the update has been
1125  * committed, and only execute the updates if it is not committed to
1126  * disk.
1127  *
1128  * \param[in] env       execution environment
1129  * \param[in] tdtd      distribute txn replay data which hold all of replay
1130  *                      reqs and all replay parameters.
1131  * \param[in] dtrq      distribute transaction replay req.
1132  * \param[in] ta        thandle execute args.
1133  *
1134  * \retval              0 if declare succeeds.
1135  * \retval              negative errno if declare fails.
1136  */
1137 static int update_recovery_exec(const struct lu_env *env,
1138                                 struct target_distribute_txn_data *tdtd,
1139                                 struct distribute_txn_replay_req *dtrq,
1140                                 struct thandle_exec_args *ta)
1141 {
1142         struct llog_update_record *lur = dtrq->dtrq_lur;
1143         struct update_records   *records = &lur->lur_update_rec;
1144         struct update_ops       *ops = &records->ur_ops;
1145         struct update_params    *params = update_records_get_params(records);
1146         struct top_thandle      *top_th = container_of(ta->ta_handle,
1147                                                        struct top_thandle,
1148                                                        tt_super);
1149         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1150         struct update_op        *op;
1151         unsigned int            i;
1152         int                     rc = 0;
1153         ENTRY;
1154
1155         /* These records have been swabbed in llog_cat_process() */
1156         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1157              i++, op = update_op_next_op(op)) {
1158                 struct lu_fid           *fid = &op->uop_fid;
1159                 struct dt_object        *dt_obj;
1160                 struct dt_object        *sub_dt_obj;
1161                 struct dt_device        *sub_dt;
1162                 struct sub_thandle      *st;
1163
1164                 if (op->uop_type == OUT_NOOP)
1165                         continue;
1166
1167                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1168                 if (IS_ERR(dt_obj)) {
1169                         rc = PTR_ERR(dt_obj);
1170                         break;
1171                 }
1172                 sub_dt_obj = dt_object_child(dt_obj);
1173
1174                 /* Create sub thandle if not */
1175                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1176                 st = lookup_sub_thandle(tmt, sub_dt);
1177                 if (st == NULL) {
1178                         st = create_sub_thandle(tmt, sub_dt);
1179                         if (IS_ERR(st))
1180                                 GOTO(next, rc = PTR_ERR(st));
1181                 }
1182
1183                 /* check if updates on the OSD/OSP are committed */
1184                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1185                 if (rc == 0)
1186                         /* If this is committed, goto next */
1187                         goto next;
1188
1189                 if (rc < 0)
1190                         GOTO(next, rc);
1191
1192                 /* Create thandle for sub thandle if needed */
1193                 if (st->st_sub_th == NULL) {
1194                         rc = sub_thandle_trans_create(env, top_th, st);
1195                         if (rc != 0)
1196                                 GOTO(next, rc);
1197                 }
1198
1199                 CDEBUG(D_HA, "replay %uth update\n", i);
1200                 switch (op->uop_type) {
1201                 case OUT_CREATE:
1202                         rc = update_recovery_create(env, sub_dt_obj,
1203                                                     op, params, ta,
1204                                                     st->st_sub_th);
1205                         break;
1206                 case OUT_DESTROY:
1207                         rc = update_recovery_destroy(env, sub_dt_obj,
1208                                                      op, params, ta,
1209                                                      st->st_sub_th);
1210                         break;
1211                 case OUT_REF_ADD:
1212                         rc = update_recovery_ref_add(env, sub_dt_obj,
1213                                                      op, params, ta,
1214                                                      st->st_sub_th);
1215                         break;
1216                 case OUT_REF_DEL:
1217                         rc = update_recovery_ref_del(env, sub_dt_obj,
1218                                                      op, params, ta,
1219                                                      st->st_sub_th);
1220                         break;
1221                 case OUT_ATTR_SET:
1222                         rc = update_recovery_attr_set(env, sub_dt_obj,
1223                                                       op, params, ta,
1224                                                       st->st_sub_th);
1225                         break;
1226                 case OUT_XATTR_SET:
1227                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1228                                                        op, params, ta,
1229                                                        st->st_sub_th);
1230                         break;
1231                 case OUT_INDEX_INSERT:
1232                         rc = update_recovery_index_insert(env, sub_dt_obj,
1233                                                           op, params, ta,
1234                                                           st->st_sub_th);
1235                         break;
1236                 case OUT_INDEX_DELETE:
1237                         rc = update_recovery_index_delete(env, sub_dt_obj,
1238                                                           op, params, ta,
1239                                                           st->st_sub_th);
1240                         break;
1241                 case OUT_WRITE:
1242                         rc = update_recovery_write(env, sub_dt_obj,
1243                                                    op, params, ta,
1244                                                    st->st_sub_th);
1245                         break;
1246                 case OUT_XATTR_DEL:
1247                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1248                                                        op, params, ta,
1249                                                        st->st_sub_th);
1250                         break;
1251                 default:
1252                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1253                         rc = -EINVAL;
1254                         break;
1255                 }
1256 next:
1257                 lu_object_put(env, &dt_obj->do_lu);
1258                 if (rc < 0)
1259                         break;
1260         }
1261
1262         ta->ta_handle->th_result = rc;
1263         RETURN(rc);
1264 }
1265
1266 /**
1267  * redo updates on MDT if needed.
1268  *
1269  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1270  * this function to replay distribute txn updates on all MDTs. It only replay
1271  * updates on the MDT where the update record is missing.
1272  *
1273  * If the update already exists on the MDT, then it does not need replay the
1274  * updates on that MDT, and only mark the sub transaction has been committed
1275  * there.
1276  *
1277  * \param[in] env       execution environment
1278  * \param[in] tdtd      target distribute txn data, which holds the replay list
1279  *                      and all parameters needed by replay process.
1280  * \param[in] dtrq      distribute txn replay req.
1281  *
1282  * \retval              0 if replay succeeds.
1283  * \retval              negative errno if replay failes.
1284  */
1285 int distribute_txn_replay_handle(struct lu_env *env,
1286                                  struct target_distribute_txn_data *tdtd,
1287                                  struct distribute_txn_replay_req *dtrq)
1288 {
1289         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1290         struct thandle_exec_args *ta;
1291         struct lu_context       session_env;
1292         struct thandle          *th = NULL;
1293         struct top_thandle      *top_th;
1294         struct top_multiple_thandle *tmt;
1295         struct thandle_update_records *tur = NULL;
1296         int                     i;
1297         int                     rc = 0;
1298         ENTRY;
1299
1300         /* initialize session, it is needed for the handler of target */
1301         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1302         if (rc) {
1303                 CERROR("%s: failure to initialize session: rc = %d\n",
1304                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1305                 RETURN(rc);
1306         }
1307         lu_context_enter(&session_env);
1308         env->le_ses = &session_env;
1309         lu_env_refill(env);
1310         update_records_dump(records, D_HA, true);
1311         th = top_trans_create(env, NULL);
1312         if (IS_ERR(th))
1313                 GOTO(exit_session, rc = PTR_ERR(th));
1314
1315         ta = &update_env_info(env)->uti_tea;
1316         ta->ta_argno = 0;
1317
1318         update_env_info(env)->uti_dtrq = dtrq;
1319         /* Create distribute transaction structure for this top thandle */
1320         top_th = container_of(th, struct top_thandle, tt_super);
1321         rc = top_trans_create_tmt(env, top_th);
1322         if (rc < 0)
1323                 GOTO(stop_trans, rc);
1324
1325         th->th_dev = tdtd->tdtd_dt;
1326         ta->ta_handle = th;
1327
1328         /* check if the distribute transaction has been committed */
1329         tmt = top_th->tt_multiple_thandle;
1330         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1331         tmt->tmt_batchid = dtrq->dtrq_batchid;
1332         tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1333
1334         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1335                 tmt->tmt_committed = 1;
1336
1337         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1338         if (rc < 0)
1339                 GOTO(stop_trans, rc);
1340
1341         /* If no updates are needed to be replayed, then mark this records as
1342          * committed, so commit thread distribute_txn_commit_thread() will
1343          * delete the record */
1344         if (ta->ta_argno == 0)
1345                 tmt->tmt_committed = 1;
1346
1347         tur = &update_env_info(env)->uti_tur;
1348         tur->tur_update_records = dtrq->dtrq_lur;
1349         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1350         tur->tur_update_params = NULL;
1351         tur->tur_update_param_count = 0;
1352         tmt->tmt_update_records = tur;
1353
1354         distribute_txn_insert_by_batchid(tmt);
1355         rc = top_trans_start(env, NULL, th);
1356         if (rc < 0)
1357                 GOTO(stop_trans, rc);
1358
1359         for (i = 0; i < ta->ta_argno; i++) {
1360                 struct tx_arg           *ta_arg;
1361                 struct dt_object        *dt_obj;
1362                 struct dt_device        *sub_dt;
1363                 struct sub_thandle      *st;
1364
1365                 ta_arg = ta->ta_args[i];
1366                 dt_obj = ta_arg->object;
1367
1368                 LASSERT(tmt->tmt_committed == 0);
1369                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1370                 st = lookup_sub_thandle(tmt, sub_dt);
1371
1372                 LASSERT(st != NULL);
1373                 LASSERT(st->st_sub_th != NULL);
1374                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1375                                              ta->ta_args[i]);
1376
1377                 /* If the update is to update the reply data, then
1378                  * we need set the session information, so
1379                  * tgt_last_rcvd_update() can be called correctly */
1380                 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1381                         update_recovery_update_ses(env, tdtd, th,
1382                                                    st->st_sub_th, dtrq, ta_arg);
1383
1384                 if (unlikely(rc < 0)) {
1385                         CDEBUG(D_HA, "error during execution of #%u from"
1386                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1387                                ta->ta_args[i]->line, rc);
1388                         while (--i > 0) {
1389                                 if (ta->ta_args[i]->undo_fn != NULL) {
1390                                         dt_obj = ta->ta_args[i]->object;
1391                                         sub_dt =
1392                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1393                                         st = lookup_sub_thandle(tmt, sub_dt);
1394                                         LASSERT(st != NULL);
1395                                         LASSERT(st->st_sub_th != NULL);
1396
1397                                         ta->ta_args[i]->undo_fn(env,
1398                                                                st->st_sub_th,
1399                                                                ta->ta_args[i]);
1400                                 } else {
1401                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1402                                              dt_obd_name(ta->ta_handle->th_dev),
1403                                                ta->ta_args[i]->file,
1404                                                ta->ta_args[i]->line, -ENOTSUPP);
1405                                 }
1406                         }
1407                         break;
1408                 }
1409                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1410                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1411         }
1412
1413 stop_trans:
1414         if (rc < 0)
1415                 th->th_result = rc;
1416         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1417         for (i = 0; i < ta->ta_argno; i++) {
1418                 if (ta->ta_args[i]->object != NULL) {
1419                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1420                         ta->ta_args[i]->object = NULL;
1421                 }
1422         }
1423
1424         if (tur != NULL)
1425                 tur->tur_update_records = NULL;
1426
1427         if (tgt_ses_info(env)->tsi_exp != NULL) {
1428                 class_export_put(tgt_ses_info(env)->tsi_exp);
1429                 tgt_ses_info(env)->tsi_exp = NULL;
1430         }
1431 exit_session:
1432         lu_context_exit(&session_env);
1433         lu_context_fini(&session_env);
1434         RETURN(rc);
1435 }
1436 EXPORT_SYMBOL(distribute_txn_replay_handle);