Whamcloud - gitweb
LU-6966 test: Dash in FSNAME causes warning in testframework
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2015, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <lustre_update.h>
48 #include <lustre_swab.h>
49 #include <md_object.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include "tgt_internal.h"
53
54 /**
55  * Lookup distribute_txn_replay req
56  *
57  * Lookup distribute_txn_replay in the replay list by batchid.
58  * It is assumed the list has been locked before calling this function.
59  *
60  * \param[in] tdtd      distribute_txn_data, which holds the replay
61  *                      list.
62  * \param[in] batchid   batchid used by lookup.
63  *
64  * \retval              pointer of the replay if succeeds.
65  * \retval              NULL if can not find it.
66  */
67 static struct distribute_txn_replay_req *
68 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
69 {
70         struct distribute_txn_replay_req        *tmp;
71         struct distribute_txn_replay_req        *dtrq = NULL;
72
73         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
74                 if (tmp->dtrq_batchid == batchid) {
75                         dtrq = tmp;
76                         break;
77                 }
78         }
79         return dtrq;
80 }
81
82 /**
83  * insert distribute txn replay req
84  *
85  * Insert distribute txn replay to the replay list, and it assumes the
86  * list has been looked. Note: the replay list is a sorted list, which
87  * is sorted by master transno. It is assumed the replay list has been
88  * locked before calling this function.
89  *
90  * \param[in] tdtd      target distribute txn data where replay list is
91  * \param[in] new       distribute txn replay to be inserted
92  *
93  * \retval              0 if insertion succeeds
94  * \retval              EEXIST if the dtrq already exists
95  */
96 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
97                         struct distribute_txn_replay_req *new)
98 {
99         struct distribute_txn_replay_req *iter;
100
101         /* Check if the dtrq has been added to the list */
102         iter = dtrq_lookup(tdtd, new->dtrq_batchid);
103         if (iter != NULL)
104                 return -EEXIST;
105
106         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
107                 if (iter->dtrq_master_transno > new->dtrq_master_transno)
108                         continue;
109
110                 /* If there are mulitple replay req with same transno, then
111                  * sort them with batchid */
112                 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
113                     iter->dtrq_batchid > new->dtrq_batchid)
114                         continue;
115
116                 list_add(&new->dtrq_list, &iter->dtrq_list);
117                 break;
118         }
119
120         if (list_empty(&new->dtrq_list))
121                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
122
123         return 0;
124 }
125
126 /**
127  * create distribute txn replay req
128  *
129  * Allocate distribute txn replay req according to the update records.
130  *
131  * \param[in] tdtd      target distribute txn data where replay list is.
132  * \param[in] record    update records from the update log.
133  *
134  * \retval              the pointer of distribute txn replay req if
135  *                      the creation succeeds.
136  * \retval              NULL if the creation fails.
137  */
138 static struct distribute_txn_replay_req *
139 dtrq_create(struct target_distribute_txn_data *tdtd,
140             struct llog_update_record *lur)
141 {
142         struct distribute_txn_replay_req *new;
143
144         OBD_ALLOC_PTR(new);
145         if (new == NULL)
146                 RETURN(ERR_PTR(-ENOMEM));
147
148         new->dtrq_lur_size = llog_update_record_size(lur);
149         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
150         if (new->dtrq_lur == NULL) {
151                 OBD_FREE_PTR(new);
152                 RETURN(ERR_PTR(-ENOMEM));
153         }
154
155         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
156
157         /* If the transno in the update record is 0, it means the
158          * update are from master MDT, and it will use the master
159          * last committed transno as its master transno. Later, if
160          * the update records are gotten from slave MDTs, then these
161          * transno will be replaced.
162          * See insert_update_records_to_replay_list(). */
163         if (lur->lur_update_rec.ur_master_transno == 0) {
164                 new->dtrq_lur->lur_update_rec.ur_master_transno =
165                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
166                 new->dtrq_master_transno =
167                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
168         } else {
169                 new->dtrq_master_transno =
170                                 lur->lur_update_rec.ur_master_transno;
171         }
172
173         new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
174
175         spin_lock_init(&new->dtrq_sub_list_lock);
176         INIT_LIST_HEAD(&new->dtrq_sub_list);
177         INIT_LIST_HEAD(&new->dtrq_list);
178
179         RETURN(new);
180 }
181
182 /**
183  * Lookup distribute sub replay
184  *
185  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
186  * mdt_index.
187  *
188  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
189  * \param[in] mdt_index                 the mdt_index as the key of lookup
190  *
191  * \retval              the pointer of sub replay if it can be found.
192  * \retval              NULL if it can not find.
193  */
194 struct distribute_txn_replay_req_sub *
195 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
196 {
197         struct distribute_txn_replay_req_sub *dtrqs = NULL;
198         struct distribute_txn_replay_req_sub *tmp;
199
200         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
201                 if (tmp->dtrqs_mdt_index == mdt_index) {
202                         dtrqs = tmp;
203                         break;
204                 }
205         }
206         return dtrqs;
207 }
208
209 /**
210  * Try to add cookie to sub distribute txn request
211  *
212  * Check if the update log cookie has been added to the request, if not,
213  * add it to the dtrqs_cookie_list.
214  *
215  * \param[in] dtrqs     sub replay req where cookies to be added.
216  * \param[in] cookie    cookie to be added.
217  *
218  * \retval              0 if the cookie is adding succeeds.
219  * \retval              negative errno if adding fails.
220  */
221 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
222                                struct llog_cookie *cookie)
223 {
224         struct sub_thandle_cookie *new;
225
226         OBD_ALLOC_PTR(new);
227         if (new == NULL)
228                 return -ENOMEM;
229
230         INIT_LIST_HEAD(&new->stc_list);
231         new->stc_cookie = *cookie;
232         /* Note: only single thread will access one sub_request each time,
233          * so no need lock here */
234         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
235
236         return 0;
237 }
238
239 /**
240  * Insert distribute txn sub req replay
241  *
242  * Allocate sub replay req and insert distribute txn replay list.
243  *
244  * \param[in] dtrq      d to be added
245  * \param[in] cookie    the cookie of the update record
246  * \param[in] mdt_index the mdt_index of the update record
247  *
248  * \retval              0 if the adding succeeds.
249  * \retval              negative errno if the adding fails.
250  */
251 static int
252 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
253                            struct llog_cookie *cookie,
254                            __u32 mdt_index)
255 {
256         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
257         struct distribute_txn_replay_req_sub    *new;
258         int                                     rc;
259         ENTRY;
260
261         spin_lock(&dtrq->dtrq_sub_list_lock);
262         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
263         spin_unlock(&dtrq->dtrq_sub_list_lock);
264         if (dtrqs != NULL) {
265                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
266                 RETURN(0);
267         }
268
269         OBD_ALLOC_PTR(new);
270         if (new == NULL)
271                 RETURN(-ENOMEM);
272
273         INIT_LIST_HEAD(&new->dtrqs_list);
274         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
275         new->dtrqs_mdt_index = mdt_index;
276         spin_lock(&dtrq->dtrq_sub_list_lock);
277         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
278         if (dtrqs == NULL) {
279                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
280                 dtrqs = new;
281         } else {
282                 OBD_FREE_PTR(new);
283         }
284         spin_unlock(&dtrq->dtrq_sub_list_lock);
285
286         rc = dtrq_sub_add_cookie(dtrqs, cookie);
287
288         RETURN(rc);
289 }
290
291 /**
292  * append updates to the current replay updates
293  *
294  * Append more updates to the existent replay update. And this is only
295  * used when combining mulitple updates into one large updates during
296  * replay.
297  *
298  * \param[in] dtrq      the update replay request where the new update
299  *                      records will be added.
300  * \param[in] lur       the new update record.
301  *
302  * \retval              0 if appending succeeds.
303  * \retval              negative errno if appending fails.
304  */
305 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
306                                struct update_records *record)
307 {
308         struct llog_update_record *new_lur;
309         size_t lur_size = dtrq->dtrq_lur_size;
310         void *ptr;
311         ENTRY;
312
313         /* Because several threads might retrieve the same records from
314          * different targets, and we only need one copy of records. So
315          * we will check if the records is in the next one, if not, just
316          * skip it */
317         spin_lock(&dtrq->dtrq_sub_list_lock);
318         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
319                 spin_unlock(&dtrq->dtrq_sub_list_lock);
320                 RETURN(0);
321         }
322         dtrq->dtrq_lur->lur_update_rec.ur_index++;
323         spin_unlock(&dtrq->dtrq_sub_list_lock);
324
325         lur_size += update_records_size(record);
326         OBD_ALLOC_LARGE(new_lur, lur_size);
327         if (new_lur == NULL) {
328                 spin_lock(&dtrq->dtrq_sub_list_lock);
329                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
330                 spin_unlock(&dtrq->dtrq_sub_list_lock);
331                 RETURN(-ENOMEM);
332         }
333
334         /* Copy the old and new records to the new allocated buffer */
335         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
336         ptr = (char *)&new_lur->lur_update_rec +
337                 update_records_size(&new_lur->lur_update_rec);
338         memcpy(ptr, &record->ur_ops,
339                update_records_size(record) -
340                offsetof(struct update_records, ur_ops));
341
342         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
343         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
344         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
345
346         /* Replace the records */
347         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
348         dtrq->dtrq_lur = new_lur;
349         dtrq->dtrq_lur_size = lur_size;
350         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
351         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
352         RETURN(0);
353 }
354
355 /**
356  * Insert update records to the replay list.
357  *
358  * Allocate distribute txn replay req and insert it into the replay
359  * list, then insert the update records into the replay req.
360  *
361  * \param[in] tdtd      distribute txn replay data where the replay list
362  *                      is.
363  * \param[in] record    the update record
364  * \param[in] cookie    cookie of the record
365  * \param[in] index     mdt index of the record
366  *
367  * \retval              0 if the adding succeeds.
368  * \retval              negative errno if the adding fails.
369  */
370 int
371 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
372                                      struct llog_update_record *lur,
373                                      struct llog_cookie *cookie,
374                                      __u32 mdt_index)
375 {
376         struct distribute_txn_replay_req *dtrq;
377         struct update_records *record = &lur->lur_update_rec;
378         bool replace_record = false;
379         int rc = 0;
380         ENTRY;
381
382         CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
383                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
384                record->ur_batchid, record->ur_master_transno, mdt_index);
385
386         /* Update batchid if necessary */
387         spin_lock(&tdtd->tdtd_batchid_lock);
388         if (record->ur_batchid >= tdtd->tdtd_batchid) {
389                 CDEBUG(D_HA, "%s update batchid from "LPU64 " to "LPU64"\n",
390                        tdtd->tdtd_lut->lut_obd->obd_name,
391                        tdtd->tdtd_batchid, record->ur_batchid);
392                 tdtd->tdtd_batchid = record->ur_batchid + 1;
393         }
394         spin_unlock(&tdtd->tdtd_batchid_lock);
395
396 again:
397         spin_lock(&tdtd->tdtd_replay_list_lock);
398         /* First try to build the replay update request with the records */
399         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
400         if (dtrq == NULL) {
401                 spin_unlock(&tdtd->tdtd_replay_list_lock);
402                 dtrq = dtrq_create(tdtd, lur);
403                 if (IS_ERR(dtrq))
404                         RETURN(PTR_ERR(dtrq));
405
406                 spin_lock(&tdtd->tdtd_replay_list_lock);
407                 rc = dtrq_insert(tdtd, dtrq);
408                 if (rc < 0) {
409                         spin_unlock(&tdtd->tdtd_replay_list_lock);
410                         dtrq_destroy(dtrq);
411                         if (rc == -EEXIST)
412                                 goto again;
413                         return rc;
414                 }
415         } else {
416                 /* If the master transno in update header is not
417                 * matched with the one in the record, then it means
418                 * the dtrq is originally created by master record,
419                 * so we need update master transno and reposition
420                 * the dtrq(by master transno) in the list and also
421                 * replace update record */
422                 if (record->ur_master_transno != 0 &&
423                     dtrq->dtrq_master_transno != record->ur_master_transno &&
424                     dtrq->dtrq_lur != NULL) {
425                         list_del_init(&dtrq->dtrq_list);
426                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
427                                                 record->ur_master_transno;
428
429                         dtrq->dtrq_master_transno = record->ur_master_transno;
430                         replace_record = true;
431                         /* try to insert again */
432                         rc = dtrq_insert(tdtd, dtrq);
433                         if (rc < 0) {
434                                 spin_unlock(&tdtd->tdtd_replay_list_lock);
435                                 dtrq_destroy(dtrq);
436                                 return rc;
437                         }
438                 }
439         }
440         spin_unlock(&tdtd->tdtd_replay_list_lock);
441
442         /* Because there should be only thread access the update record, so
443          * we do not need lock here */
444         if (replace_record) {
445                 /* Replace the update record and master transno */
446                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
447                 dtrq->dtrq_lur = NULL;
448                 dtrq->dtrq_lur_size = llog_update_record_size(lur);
449                 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
450                 if (dtrq->dtrq_lur == NULL)
451                         return -ENOMEM;
452
453                 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
454         }
455
456         /* This is a partial update records, let's try to append
457          * the record to the current replay request */
458         if (record->ur_flags & UPDATE_RECORD_CONTINUE)
459                 rc = dtrq_append_updates(dtrq, record);
460
461         /* Then create and add sub update request */
462         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
463
464         RETURN(rc);
465 }
466 EXPORT_SYMBOL(insert_update_records_to_replay_list);
467
468 /**
469  * Dump updates of distribute txns.
470  *
471  * Output all of recovery updates in the distribute txn list to the
472  * debug log.
473  *
474  * \param[in] tdtd      distribute txn data where all of distribute txn
475  *                      are listed.
476  * \param[in] mask      debug mask
477  */
478 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
479 {
480         struct distribute_txn_replay_req *dtrq;
481
482         spin_lock(&tdtd->tdtd_replay_list_lock);
483         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
484                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
485                                     false);
486         spin_unlock(&tdtd->tdtd_replay_list_lock);
487 }
488 EXPORT_SYMBOL(dtrq_list_dump);
489
490 /**
491  * Destroy distribute txn replay req
492  *
493  * Destroy distribute txn replay req and all of subs.
494  *
495  * \param[in] dtrq      distribute txn replqy req to be destroyed.
496  */
497 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
498 {
499         struct distribute_txn_replay_req_sub    *dtrqs;
500         struct distribute_txn_replay_req_sub    *tmp;
501
502         LASSERT(list_empty(&dtrq->dtrq_list));
503         spin_lock(&dtrq->dtrq_sub_list_lock);
504         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
505                 struct sub_thandle_cookie *stc;
506                 struct sub_thandle_cookie *tmp;
507
508                 list_del(&dtrqs->dtrqs_list);
509                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
510                                          stc_list) {
511                         list_del(&stc->stc_list);
512                         OBD_FREE_PTR(stc);
513                 }
514                 OBD_FREE_PTR(dtrqs);
515         }
516         spin_unlock(&dtrq->dtrq_sub_list_lock);
517
518         if (dtrq->dtrq_lur != NULL)
519                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
520
521         OBD_FREE_PTR(dtrq);
522 }
523 EXPORT_SYMBOL(dtrq_destroy);
524
525 /**
526  * Destroy all of replay req.
527  *
528  * Destroy all of replay req in the replay list.
529  *
530  * \param[in] tdtd      target distribute txn data where the replay list is.
531  */
532 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
533 {
534         struct distribute_txn_replay_req *dtrq;
535         struct distribute_txn_replay_req *tmp;
536
537         spin_lock(&tdtd->tdtd_replay_list_lock);
538         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
539                                  dtrq_list) {
540                 list_del_init(&dtrq->dtrq_list);
541                 dtrq_destroy(dtrq);
542         }
543         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
544                                  dtrq_list) {
545                 list_del_init(&dtrq->dtrq_list);
546                 dtrq_destroy(dtrq);
547         }
548         spin_unlock(&tdtd->tdtd_replay_list_lock);
549 }
550 EXPORT_SYMBOL(dtrq_list_destroy);
551
552 /**
553  * Get next req in the replay list
554  *
555  * Get next req needs to be replayed, since it is a sorted list
556  * (by master MDT transno)
557  *
558  * \param[in] tdtd      distribute txn data where the replay list is
559  *
560  * \retval              the pointer of update recovery header
561  */
562 struct distribute_txn_replay_req *
563 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
564 {
565         struct distribute_txn_replay_req *dtrq = NULL;
566
567         spin_lock(&tdtd->tdtd_replay_list_lock);
568         if (!list_empty(&tdtd->tdtd_replay_list)) {
569                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
570                                  struct distribute_txn_replay_req, dtrq_list);
571                 list_del_init(&dtrq->dtrq_list);
572         }
573         spin_unlock(&tdtd->tdtd_replay_list_lock);
574
575         return dtrq;
576 }
577 EXPORT_SYMBOL(distribute_txn_get_next_req);
578
579 /**
580  * Get next transno in the replay list, because this is the sorted
581  * list, so it will return the transno of next req in the list.
582  *
583  * \param[in] tdtd      distribute txn data where the replay list is
584  *
585  * \retval              the transno of next update in the list
586  */
587 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
588 {
589         struct distribute_txn_replay_req        *dtrq = NULL;
590         __u64                                   transno = 0;
591
592         spin_lock(&tdtd->tdtd_replay_list_lock);
593         if (!list_empty(&tdtd->tdtd_replay_list)) {
594                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
595                                  struct distribute_txn_replay_req, dtrq_list);
596                 transno = dtrq->dtrq_master_transno;
597         }
598         spin_unlock(&tdtd->tdtd_replay_list_lock);
599
600         CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
601                tdtd->tdtd_lut->lut_obd->obd_name, transno);
602         return transno;
603 }
604 EXPORT_SYMBOL(distribute_txn_get_next_transno);
605
606 struct distribute_txn_replay_req *
607 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
608                                   __u64 xid)
609 {
610         struct distribute_txn_replay_req *dtrq = NULL;
611         struct distribute_txn_replay_req *iter;
612
613         spin_lock(&tdtd->tdtd_replay_list_lock);
614         list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
615                 if (iter->dtrq_xid == xid) {
616                         dtrq = iter;
617                         break;
618                 }
619         }
620         spin_unlock(&tdtd->tdtd_replay_list_lock);
621         return dtrq;
622 }
623
624 bool is_req_replayed_by_update(struct ptlrpc_request *req)
625 {
626         struct lu_target *tgt = class_exp2tgt(req->rq_export);
627         struct distribute_txn_replay_req *dtrq;
628
629         if (tgt->lut_tdtd == NULL)
630                 return false;
631
632         dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
633         if (dtrq == NULL)
634                 return false;
635
636         return true;
637 }
638 EXPORT_SYMBOL(is_req_replayed_by_update);
639
640 /**
641  * Check if the update of one object is committed
642  *
643  * Check whether the update for the object is committed by checking whether
644  * the correspondent sub exists in the replay req. If it is committed, mark
645  * the committed flag in correspondent the sub thandle.
646  *
647  * \param[in] env       execution environment
648  * \param[in] dtrq      replay request
649  * \param[in] dt_obj    object for the update
650  * \param[in] top_th    top thandle
651  * \param[in] sub_th    sub thandle which the update belongs to
652  *
653  * \retval              1 if the update is not committed.
654  * \retval              0 if the update is committed.
655  * \retval              negative errno if some other failures happen.
656  */
657 static int update_is_committed(const struct lu_env *env,
658                                struct distribute_txn_replay_req *dtrq,
659                                struct dt_object *dt_obj,
660                                struct top_thandle *top_th,
661                                struct sub_thandle *st)
662 {
663         struct seq_server_site  *seq_site;
664         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
665         struct distribute_txn_replay_req_sub    *dtrqs;
666         __u32                   mdt_index;
667         ENTRY;
668
669         if (st->st_sub_th != NULL)
670                 RETURN(1);
671
672         if (st->st_committed)
673                 RETURN(0);
674
675         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
676         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
677                 mdt_index = fid_oid(fid);
678         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
679                 mdt_index = seq_site->ss_node_id;
680         } else {
681                 struct lu_server_fld *fld;
682                 struct lu_seq_range range = {0};
683                 int rc;
684
685                 fld = seq_site->ss_server_fld;
686                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
687                 LASSERT(fld->lsf_seq_lookup != NULL);
688                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
689                                          &range);
690                 if (rc < 0)
691                         RETURN(rc);
692                 mdt_index = range.lsr_index;
693         }
694
695         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
696         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
697                 st->st_committed = 1;
698                 if (dtrqs != NULL) {
699                         struct sub_thandle_cookie *stc;
700                         struct sub_thandle_cookie *tmp;
701
702                         list_for_each_entry_safe(stc, tmp,
703                                                  &dtrqs->dtrqs_cookie_list,
704                                                  stc_list)
705                                 list_move(&stc->stc_list, &st->st_cookie_list);
706                 }
707                 RETURN(0);
708         }
709
710         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
711                mdt_index);
712
713         RETURN(1);
714 }
715
716 /**
717  * Implementation of different update methods for update recovery.
718  *
719  * These following functions update_recovery_$(update_name) implement
720  * different updates recovery methods. They will extract the parameters
721  * from the common parameters area and call correspondent dt API to redo
722  * the update.
723  *
724  * \param[in] env       execution environment
725  * \param[in] op        update operation to be replayed
726  * \param[in] params    common update parameters which holds all parameters
727  *                      of the operation
728  * \param[in] th        transaction handle
729  * \param[in] declare   indicate it will do declare or real execution, true
730  *                      means declare, false means real execution
731  *
732  * \retval              0 if it succeeds.
733  * \retval              negative errno if it fails.
734  */
735 static int update_recovery_create(const struct lu_env *env,
736                                   struct dt_object *dt_obj,
737                                   const struct update_op *op,
738                                   const struct update_params *params,
739                                   struct thandle_exec_args *ta,
740                                   struct thandle *th)
741 {
742         struct update_thread_info *uti = update_env_info(env);
743         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
744         struct lu_attr          *attr = &uti->uti_attr;
745         struct obdo             *wobdo;
746         struct obdo             *lobdo = &uti->uti_obdo;
747         struct dt_object_format dof;
748         __u16                   size;
749         unsigned int            param_count;
750         int rc;
751         ENTRY;
752
753         if (dt_object_exists(dt_obj))
754                 RETURN(-EEXIST);
755
756         param_count = lur->lur_update_rec.ur_param_count;
757         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
758                                             param_count, &size);
759         if (wobdo == NULL)
760                 RETURN(-EIO);
761         if (size != sizeof(*wobdo))
762                 RETURN(-EIO);
763
764         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
765                 lustre_swab_obdo(wobdo);
766
767         lustre_get_wire_obdo(NULL, lobdo, wobdo);
768         la_from_obdo(attr, lobdo, lobdo->o_valid);
769
770         dof.dof_type = dt_mode_to_dft(attr->la_mode);
771
772         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
773                            ta, th, NULL, 0);
774
775         RETURN(rc);
776 }
777
778 static int update_recovery_destroy(const struct lu_env *env,
779                                    struct dt_object *dt_obj,
780                                    const struct update_op *op,
781                                    const struct update_params *params,
782                                    struct thandle_exec_args *ta,
783                                    struct thandle *th)
784 {
785         int rc;
786         ENTRY;
787
788         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
789
790         RETURN(rc);
791 }
792
793 static int update_recovery_ref_add(const struct lu_env *env,
794                                    struct dt_object *dt_obj,
795                                    const struct update_op *op,
796                                    const struct update_params *params,
797                                    struct thandle_exec_args *ta,
798                                    struct thandle *th)
799 {
800         int rc;
801         ENTRY;
802
803         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
804
805         RETURN(rc);
806 }
807
808 static int update_recovery_ref_del(const struct lu_env *env,
809                                    struct dt_object *dt_obj,
810                                    const struct update_op *op,
811                                    const struct update_params *params,
812                                    struct thandle_exec_args *ta,
813                                    struct thandle *th)
814 {
815         int rc;
816         ENTRY;
817
818         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
819
820         RETURN(rc);
821 }
822
823 static int update_recovery_attr_set(const struct lu_env *env,
824                                     struct dt_object *dt_obj,
825                                     const struct update_op *op,
826                                     const struct update_params *params,
827                                     struct thandle_exec_args *ta,
828                                     struct thandle *th)
829 {
830         struct update_thread_info *uti = update_env_info(env);
831         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
832         struct obdo     *wobdo;
833         struct obdo     *lobdo = &uti->uti_obdo;
834         struct lu_attr  *attr = &uti->uti_attr;
835         __u16           size;
836         unsigned int    param_count;
837         int             rc;
838         ENTRY;
839
840         param_count = lur->lur_update_rec.ur_param_count;
841         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
842                                             param_count, &size);
843         if (wobdo == NULL)
844                 RETURN(-EIO);
845         if (size != sizeof(*wobdo))
846                 RETURN(-EIO);
847
848         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
849                 lustre_swab_obdo(wobdo);
850
851         lustre_get_wire_obdo(NULL, lobdo, wobdo);
852         la_from_obdo(attr, lobdo, lobdo->o_valid);
853
854         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
855
856         RETURN(rc);
857 }
858
859 static int update_recovery_xattr_set(const struct lu_env *env,
860                                      struct dt_object *dt_obj,
861                                      const struct update_op *op,
862                                      const struct update_params *params,
863                                      struct thandle_exec_args *ta,
864                                      struct thandle *th)
865 {
866         struct update_thread_info *uti = update_env_info(env);
867         char            *buf;
868         char            *name;
869         int             fl;
870         __u16           size;
871         __u32           param_count;
872         int             rc;
873         ENTRY;
874
875         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
876         name = update_params_get_param_buf(params,
877                                            op->uop_params_off[0],
878                                            param_count, &size);
879         if (name == NULL)
880                 RETURN(-EIO);
881
882         buf = update_params_get_param_buf(params,
883                                           op->uop_params_off[1],
884                                           param_count, &size);
885         if (buf == NULL)
886                 RETURN(-EIO);
887
888         uti->uti_buf.lb_buf = buf;
889         uti->uti_buf.lb_len = (size_t)size;
890
891         buf = update_params_get_param_buf(params, op->uop_params_off[2],
892                                           param_count, &size);
893         if (buf == NULL)
894                 RETURN(-EIO);
895         if (size != sizeof(fl))
896                 RETURN(-EIO);
897
898         fl = le32_to_cpu(*(int *)buf);
899
900         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
901                               NULL, 0);
902
903         RETURN(rc);
904 }
905
906 static int update_recovery_index_insert(const struct lu_env *env,
907                                         struct dt_object *dt_obj,
908                                         const struct update_op *op,
909                                         const struct update_params *params,
910                                         struct thandle_exec_args *ta,
911                                         struct thandle *th)
912 {
913         struct update_thread_info *uti = update_env_info(env);
914         struct lu_fid           *fid;
915         char                    *name;
916         __u32                   param_count;
917         __u32                   *ptype;
918         __u32                   type;
919         __u16                   size;
920         int rc;
921         ENTRY;
922
923         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
924         name = update_params_get_param_buf(params, op->uop_params_off[0],
925                                            param_count, &size);
926         if (name == NULL)
927                 RETURN(-EIO);
928
929         fid = update_params_get_param_buf(params, op->uop_params_off[1],
930                                           param_count, &size);
931         if (fid == NULL)
932                 RETURN(-EIO);
933         if (size != sizeof(*fid))
934                 RETURN(-EIO);
935
936         fid_le_to_cpu(fid, fid);
937
938         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
939                                             param_count, &size);
940         if (ptype == NULL)
941                 RETURN(-EIO);
942         if (size != sizeof(*ptype))
943                 RETURN(-EIO);
944         type = le32_to_cpu(*ptype);
945
946         if (dt_try_as_dir(env, dt_obj) == 0)
947                 RETURN(-ENOTDIR);
948
949         uti->uti_rec.rec_fid = fid;
950         uti->uti_rec.rec_type = type;
951
952         rc = out_tx_index_insert(env, dt_obj,
953                                  (const struct dt_rec *)&uti->uti_rec,
954                                  (const struct dt_key *)name, ta, th,
955                                  NULL, 0);
956
957         RETURN(rc);
958 }
959
960 static int update_recovery_index_delete(const struct lu_env *env,
961                                         struct dt_object *dt_obj,
962                                         const struct update_op *op,
963                                         const struct update_params *params,
964                                         struct thandle_exec_args *ta,
965                                         struct thandle *th)
966 {
967         struct update_thread_info *uti = update_env_info(env);
968         __u32   param_count;
969         char    *name;
970         __u16   size;
971         int     rc;
972         ENTRY;
973
974         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
975         name = update_params_get_param_buf(params, op->uop_params_off[0],
976                                            param_count, &size);
977         if (name == NULL)
978                 RETURN(-EIO);
979
980         if (dt_try_as_dir(env, dt_obj) == 0)
981                 RETURN(-ENOTDIR);
982
983         rc = out_tx_index_delete(env, dt_obj,
984                                  (const struct dt_key *)name, ta, th, NULL, 0);
985
986         RETURN(rc);
987 }
988
989 static int update_recovery_write(const struct lu_env *env,
990                                  struct dt_object *dt_obj,
991                                  const struct update_op *op,
992                                  const struct update_params *params,
993                                  struct thandle_exec_args *ta,
994                                  struct thandle *th)
995 {
996         struct update_thread_info *uti = update_env_info(env);
997         char            *buf;
998         __u32           param_count;
999         __u64           pos;
1000         __u16           size;
1001         int rc;
1002         ENTRY;
1003
1004         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1005         buf = update_params_get_param_buf(params, op->uop_params_off[0],
1006                                           param_count, &size);
1007         if (buf == NULL)
1008                 RETURN(-EIO);
1009
1010         uti->uti_buf.lb_buf = buf;
1011         uti->uti_buf.lb_len = size;
1012
1013         buf = update_params_get_param_buf(params, op->uop_params_off[1],
1014                                           param_count, &size);
1015         if (buf == NULL)
1016                 RETURN(-EIO);
1017
1018         pos = le64_to_cpu(*(__u64 *)buf);
1019
1020         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1021                           ta, th, NULL, 0);
1022
1023         RETURN(rc);
1024 }
1025
1026 static int update_recovery_xattr_del(const struct lu_env *env,
1027                                      struct dt_object *dt_obj,
1028                                      const struct update_op *op,
1029                                      const struct update_params *params,
1030                                      struct thandle_exec_args *ta,
1031                                      struct thandle *th)
1032 {
1033         struct update_thread_info *uti = update_env_info(env);
1034         __u32   param_count;
1035         char    *name;
1036         __u16   size;
1037         int     rc;
1038         ENTRY;
1039
1040         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1041         name = update_params_get_param_buf(params, op->uop_params_off[0],
1042                                            param_count, &size);
1043         if (name == NULL)
1044                 RETURN(-EIO);
1045
1046         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1047
1048         RETURN(rc);
1049 }
1050
1051 /**
1052  * Update session information
1053  *
1054  * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1055  * can be called correctly during update replay.
1056  *
1057  * \param[in] env       execution environment.
1058  * \param[in] tdtd      distribute data structure of the recovering tgt.
1059  * \param[in] th        thandle of this update replay.
1060  * \param[in] master_th master sub thandle.
1061  * \param[in] ta_arg    the tx arg structure to hold the update for updating
1062  *                      reply data.
1063  */
1064 static void update_recovery_update_ses(struct lu_env *env,
1065                                       struct target_distribute_txn_data *tdtd,
1066                                       struct thandle *th,
1067                                       struct thandle *master_th,
1068                                       struct distribute_txn_replay_req *dtrq,
1069                                       struct tx_arg *ta_arg)
1070 {
1071         struct tgt_session_info *tsi;
1072         struct lu_target        *lut = tdtd->tdtd_lut;
1073         struct obd_export       *export;
1074         struct cfs_hash         *hash;
1075         struct top_thandle      *top_th;
1076         struct lsd_reply_data   *lrd;
1077         size_t                  size;
1078
1079         tsi = tgt_ses_info(env);
1080         if (tsi->tsi_exp != NULL)
1081                 return;
1082
1083         size = ta_arg->u.write.buf.lb_len;
1084         lrd = ta_arg->u.write.buf.lb_buf;
1085         if (size != sizeof(*lrd) || lrd == NULL)
1086                 return;
1087
1088         lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
1089         lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
1090         lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
1091         lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
1092         lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
1093
1094         if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1095                 return;
1096
1097         hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1098         if (hash == NULL)
1099                 return;
1100
1101         export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1102         if (export == NULL) {
1103                 cfs_hash_putref(hash);
1104                 return;
1105         }
1106
1107         tsi->tsi_exp = export;
1108         tsi->tsi_xid = lrd->lrd_xid;
1109         tsi->tsi_opdata = lrd->lrd_data;
1110         tsi->tsi_result = lrd->lrd_result;
1111         tsi->tsi_client_gen = lrd->lrd_client_gen;
1112         dtrq->dtrq_xid = lrd->lrd_xid;
1113         top_th = container_of(th, struct top_thandle, tt_super);
1114         top_th->tt_master_sub_thandle = master_th;
1115         cfs_hash_putref(hash);
1116 }
1117
1118 /**
1119  * Execute updates in the update replay records
1120  *
1121  * Declare distribute txn replay by update records and add the updates
1122  * to the execution list. Note: it will check if the update has been
1123  * committed, and only execute the updates if it is not committed to
1124  * disk.
1125  *
1126  * \param[in] env       execution environment
1127  * \param[in] tdtd      distribute txn replay data which hold all of replay
1128  *                      reqs and all replay parameters.
1129  * \param[in] dtrq      distribute transaction replay req.
1130  * \param[in] ta        thandle execute args.
1131  *
1132  * \retval              0 if declare succeeds.
1133  * \retval              negative errno if declare fails.
1134  */
1135 static int update_recovery_exec(const struct lu_env *env,
1136                                 struct target_distribute_txn_data *tdtd,
1137                                 struct distribute_txn_replay_req *dtrq,
1138                                 struct thandle_exec_args *ta)
1139 {
1140         struct llog_update_record *lur = dtrq->dtrq_lur;
1141         struct update_records   *records = &lur->lur_update_rec;
1142         struct update_ops       *ops = &records->ur_ops;
1143         struct update_params    *params = update_records_get_params(records);
1144         struct top_thandle      *top_th = container_of(ta->ta_handle,
1145                                                        struct top_thandle,
1146                                                        tt_super);
1147         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1148         struct update_op        *op;
1149         unsigned int            i;
1150         int                     rc = 0;
1151         ENTRY;
1152
1153         /* These records have been swabbed in llog_cat_process() */
1154         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1155              i++, op = update_op_next_op(op)) {
1156                 struct lu_fid           *fid = &op->uop_fid;
1157                 struct dt_object        *dt_obj;
1158                 struct dt_object        *sub_dt_obj;
1159                 struct dt_device        *sub_dt;
1160                 struct sub_thandle      *st;
1161
1162                 if (op->uop_type == OUT_NOOP)
1163                         continue;
1164
1165                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1166                 if (IS_ERR(dt_obj)) {
1167                         rc = PTR_ERR(dt_obj);
1168                         break;
1169                 }
1170                 sub_dt_obj = dt_object_child(dt_obj);
1171
1172                 /* Create sub thandle if not */
1173                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1174                 st = lookup_sub_thandle(tmt, sub_dt);
1175                 if (st == NULL) {
1176                         st = create_sub_thandle(tmt, sub_dt);
1177                         if (IS_ERR(st))
1178                                 GOTO(next, rc = PTR_ERR(st));
1179                 }
1180
1181                 /* check if updates on the OSD/OSP are committed */
1182                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1183                 if (rc == 0)
1184                         /* If this is committed, goto next */
1185                         goto next;
1186
1187                 if (rc < 0)
1188                         GOTO(next, rc);
1189
1190                 /* Create thandle for sub thandle if needed */
1191                 if (st->st_sub_th == NULL) {
1192                         rc = sub_thandle_trans_create(env, top_th, st);
1193                         if (rc != 0)
1194                                 GOTO(next, rc);
1195                 }
1196
1197                 CDEBUG(D_HA, "replay %uth update\n", i);
1198                 switch (op->uop_type) {
1199                 case OUT_CREATE:
1200                         rc = update_recovery_create(env, sub_dt_obj,
1201                                                     op, params, ta,
1202                                                     st->st_sub_th);
1203                         break;
1204                 case OUT_DESTROY:
1205                         rc = update_recovery_destroy(env, sub_dt_obj,
1206                                                      op, params, ta,
1207                                                      st->st_sub_th);
1208                         break;
1209                 case OUT_REF_ADD:
1210                         rc = update_recovery_ref_add(env, sub_dt_obj,
1211                                                      op, params, ta,
1212                                                      st->st_sub_th);
1213                         break;
1214                 case OUT_REF_DEL:
1215                         rc = update_recovery_ref_del(env, sub_dt_obj,
1216                                                      op, params, ta,
1217                                                      st->st_sub_th);
1218                         break;
1219                 case OUT_ATTR_SET:
1220                         rc = update_recovery_attr_set(env, sub_dt_obj,
1221                                                       op, params, ta,
1222                                                       st->st_sub_th);
1223                         break;
1224                 case OUT_XATTR_SET:
1225                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1226                                                        op, params, ta,
1227                                                        st->st_sub_th);
1228                         break;
1229                 case OUT_INDEX_INSERT:
1230                         rc = update_recovery_index_insert(env, sub_dt_obj,
1231                                                           op, params, ta,
1232                                                           st->st_sub_th);
1233                         break;
1234                 case OUT_INDEX_DELETE:
1235                         rc = update_recovery_index_delete(env, sub_dt_obj,
1236                                                           op, params, ta,
1237                                                           st->st_sub_th);
1238                         break;
1239                 case OUT_WRITE:
1240                         rc = update_recovery_write(env, sub_dt_obj,
1241                                                    op, params, ta,
1242                                                    st->st_sub_th);
1243                         break;
1244                 case OUT_XATTR_DEL:
1245                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1246                                                        op, params, ta,
1247                                                        st->st_sub_th);
1248                         break;
1249                 default:
1250                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1251                         rc = -EINVAL;
1252                         break;
1253                 }
1254 next:
1255                 lu_object_put(env, &dt_obj->do_lu);
1256                 if (rc < 0)
1257                         break;
1258         }
1259
1260         ta->ta_handle->th_result = rc;
1261         RETURN(rc);
1262 }
1263
1264 /**
1265  * redo updates on MDT if needed.
1266  *
1267  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1268  * this function to replay distribute txn updates on all MDTs. It only replay
1269  * updates on the MDT where the update record is missing.
1270  *
1271  * If the update already exists on the MDT, then it does not need replay the
1272  * updates on that MDT, and only mark the sub transaction has been committed
1273  * there.
1274  *
1275  * \param[in] env       execution environment
1276  * \param[in] tdtd      target distribute txn data, which holds the replay list
1277  *                      and all parameters needed by replay process.
1278  * \param[in] dtrq      distribute txn replay req.
1279  *
1280  * \retval              0 if replay succeeds.
1281  * \retval              negative errno if replay failes.
1282  */
1283 int distribute_txn_replay_handle(struct lu_env *env,
1284                                  struct target_distribute_txn_data *tdtd,
1285                                  struct distribute_txn_replay_req *dtrq)
1286 {
1287         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1288         struct thandle_exec_args *ta;
1289         struct lu_context       session_env;
1290         struct thandle          *th = NULL;
1291         struct top_thandle      *top_th;
1292         struct top_multiple_thandle *tmt;
1293         struct thandle_update_records *tur = NULL;
1294         int                     i;
1295         int                     rc = 0;
1296         ENTRY;
1297
1298         /* initialize session, it is needed for the handler of target */
1299         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1300         if (rc) {
1301                 CERROR("%s: failure to initialize session: rc = %d\n",
1302                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1303                 RETURN(rc);
1304         }
1305         lu_context_enter(&session_env);
1306         env->le_ses = &session_env;
1307         lu_env_refill(env);
1308         update_records_dump(records, D_HA, true);
1309         th = top_trans_create(env, NULL);
1310         if (IS_ERR(th))
1311                 GOTO(exit_session, rc = PTR_ERR(th));
1312
1313         ta = &update_env_info(env)->uti_tea;
1314         ta->ta_argno = 0;
1315
1316         update_env_info(env)->uti_dtrq = dtrq;
1317         /* Create distribute transaction structure for this top thandle */
1318         top_th = container_of(th, struct top_thandle, tt_super);
1319         rc = top_trans_create_tmt(env, top_th);
1320         if (rc < 0)
1321                 GOTO(stop_trans, rc);
1322
1323         th->th_dev = tdtd->tdtd_dt;
1324         ta->ta_handle = th;
1325
1326         /* check if the distribute transaction has been committed */
1327         tmt = top_th->tt_multiple_thandle;
1328         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1329         tmt->tmt_batchid = dtrq->dtrq_batchid;
1330         tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1331
1332         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1333                 tmt->tmt_committed = 1;
1334
1335         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1336         if (rc < 0)
1337                 GOTO(stop_trans, rc);
1338
1339         /* If no updates are needed to be replayed, then mark this records as
1340          * committed, so commit thread distribute_txn_commit_thread() will
1341          * delete the record */
1342         if (ta->ta_argno == 0)
1343                 tmt->tmt_committed = 1;
1344
1345         tur = &update_env_info(env)->uti_tur;
1346         tur->tur_update_records = dtrq->dtrq_lur;
1347         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1348         tur->tur_update_params = NULL;
1349         tur->tur_update_param_count = 0;
1350         tmt->tmt_update_records = tur;
1351
1352         distribute_txn_insert_by_batchid(tmt);
1353         rc = top_trans_start(env, NULL, th);
1354         if (rc < 0)
1355                 GOTO(stop_trans, rc);
1356
1357         for (i = 0; i < ta->ta_argno; i++) {
1358                 struct tx_arg           *ta_arg;
1359                 struct dt_object        *dt_obj;
1360                 struct dt_device        *sub_dt;
1361                 struct sub_thandle      *st;
1362
1363                 ta_arg = ta->ta_args[i];
1364                 dt_obj = ta_arg->object;
1365
1366                 LASSERT(tmt->tmt_committed == 0);
1367                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1368                 st = lookup_sub_thandle(tmt, sub_dt);
1369
1370                 LASSERT(st != NULL);
1371                 LASSERT(st->st_sub_th != NULL);
1372                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1373                                              ta->ta_args[i]);
1374
1375                 /* If the update is to update the reply data, then
1376                  * we need set the session information, so
1377                  * tgt_last_rcvd_update() can be called correctly */
1378                 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1379                         update_recovery_update_ses(env, tdtd, th,
1380                                                    st->st_sub_th, dtrq, ta_arg);
1381
1382                 if (unlikely(rc < 0)) {
1383                         CDEBUG(D_HA, "error during execution of #%u from"
1384                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1385                                ta->ta_args[i]->line, rc);
1386                         while (--i > 0) {
1387                                 if (ta->ta_args[i]->undo_fn != NULL) {
1388                                         dt_obj = ta->ta_args[i]->object;
1389                                         sub_dt =
1390                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1391                                         st = lookup_sub_thandle(tmt, sub_dt);
1392                                         LASSERT(st != NULL);
1393                                         LASSERT(st->st_sub_th != NULL);
1394
1395                                         ta->ta_args[i]->undo_fn(env,
1396                                                                st->st_sub_th,
1397                                                                ta->ta_args[i]);
1398                                 } else {
1399                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1400                                              dt_obd_name(ta->ta_handle->th_dev),
1401                                                ta->ta_args[i]->file,
1402                                                ta->ta_args[i]->line, -ENOTSUPP);
1403                                 }
1404                         }
1405                         break;
1406                 }
1407                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1408                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1409         }
1410
1411 stop_trans:
1412         if (rc < 0)
1413                 th->th_result = rc;
1414         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1415         for (i = 0; i < ta->ta_argno; i++) {
1416                 if (ta->ta_args[i]->object != NULL) {
1417                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1418                         ta->ta_args[i]->object = NULL;
1419                 }
1420         }
1421
1422         if (tur != NULL)
1423                 tur->tur_update_records = NULL;
1424
1425         if (tgt_ses_info(env)->tsi_exp != NULL) {
1426                 class_export_put(tgt_ses_info(env)->tsi_exp);
1427                 tgt_ses_info(env)->tsi_exp = NULL;
1428         }
1429 exit_session:
1430         lu_context_exit(&session_env);
1431         lu_context_fini(&session_env);
1432         RETURN(rc);
1433 }
1434 EXPORT_SYMBOL(distribute_txn_replay_handle);