Whamcloud - gitweb
New RC 2.16.0-RC5
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2015, 2017, Intel Corporation.
5  */
6
7 /*
8  * This file implement the methods to handle the update recovery.
9  *
10  * During DNE recovery, the recovery thread will redo the operation according
11  * to the transaction no, and these replay are either from client replay req
12  * or update replay records(for distribute transaction) in the update log.
13  * For distribute transaction replay, the replay thread will call
14  * distribute_txn_replay_handle() to handle the updates.
15  *
16  * After the Master MDT restarts, it will retrieve the update records from all
17  * of MDTs, for each distributed operation, it will check updates on all MDTs,
18  * if some updates records are missing on some MDTs, the replay thread will redo
19  * updates on these MDTs.
20  *
21  * Author: Di Wang <di.wang@intel.com>
22  */
23
24 #define DEBUG_SUBSYSTEM S_CLASS
25
26 #include <lu_target.h>
27 #include <lustre_obdo.h>
28 #include <lustre_update.h>
29 #include <lustre_swab.h>
30 #include <md_object.h>
31 #include <obd.h>
32 #include <obd_class.h>
33
34 #include "tgt_internal.h"
35
36 /**
37  * Lookup distribute_txn_replay req
38  *
39  * Lookup distribute_txn_replay in the replay list by batchid.
40  * It is assumed the list has been locked before calling this function.
41  *
42  * \param[in] tdtd      distribute_txn_data, which holds the replay
43  *                      list.
44  * \param[in] batchid   batchid used by lookup.
45  *
46  * \retval              pointer of the replay if succeeds.
47  * \retval              NULL if can not find it.
48  */
49 static struct distribute_txn_replay_req *
50 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
51 {
52         struct distribute_txn_replay_req        *tmp;
53         struct distribute_txn_replay_req        *dtrq = NULL;
54
55         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
56                 if (tmp->dtrq_batchid == batchid) {
57                         dtrq = tmp;
58                         break;
59                 }
60         }
61         return dtrq;
62 }
63
64 /**
65  * insert distribute txn replay req
66  *
67  * Insert distribute txn replay to the replay list, and it assumes the
68  * list has been looked. Note: the replay list is a sorted list, which
69  * is sorted by master transno. It is assumed the replay list has been
70  * locked before calling this function.
71  *
72  * \param[in] tdtd      target distribute txn data where replay list is
73  * \param[in] new       distribute txn replay to be inserted
74  *
75  * \retval              0 if insertion succeeds
76  * \retval              EEXIST if the dtrq already exists
77  */
78 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
79                         struct distribute_txn_replay_req *new)
80 {
81         struct distribute_txn_replay_req *iter;
82
83         /* Check if the dtrq has been added to the list */
84         iter = dtrq_lookup(tdtd, new->dtrq_batchid);
85         if (iter != NULL)
86                 return -EEXIST;
87
88         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
89                 if (iter->dtrq_master_transno > new->dtrq_master_transno)
90                         continue;
91
92                 /* If there are mulitple replay req with same transno, then
93                  * sort them with batchid */
94                 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
95                     iter->dtrq_batchid > new->dtrq_batchid)
96                         continue;
97
98                 list_add(&new->dtrq_list, &iter->dtrq_list);
99                 break;
100         }
101
102         if (list_empty(&new->dtrq_list))
103                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
104
105         return 0;
106 }
107
108 /**
109  * create distribute txn replay req
110  *
111  * Allocate distribute txn replay req according to the update records.
112  *
113  * \param[in] tdtd      target distribute txn data where replay list is.
114  * \param[in] record    update records from the update log.
115  *
116  * \retval              the pointer of distribute txn replay req if
117  *                      the creation succeeds.
118  * \retval              NULL if the creation fails.
119  */
120 static struct distribute_txn_replay_req *
121 dtrq_create(struct target_distribute_txn_data *tdtd,
122             struct llog_update_record *lur)
123 {
124         struct distribute_txn_replay_req *new;
125
126         OBD_ALLOC_PTR(new);
127         if (new == NULL)
128                 RETURN(ERR_PTR(-ENOMEM));
129
130         new->dtrq_lur_size = llog_update_record_size(lur);
131         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
132         if (new->dtrq_lur == NULL) {
133                 OBD_FREE_PTR(new);
134                 RETURN(ERR_PTR(-ENOMEM));
135         }
136
137         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
138
139         /* If the transno in the update record is 0, it means the
140          * update are from master MDT, and it will use the master
141          * last committed transno as its master transno. Later, if
142          * the update records are gotten from slave MDTs, then these
143          * transno will be replaced.
144          * See insert_update_records_to_replay_list(). */
145         if (lur->lur_update_rec.ur_master_transno == 0) {
146                 new->dtrq_lur->lur_update_rec.ur_master_transno =
147                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
148                 new->dtrq_master_transno =
149                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
150         } else {
151                 new->dtrq_master_transno =
152                                 lur->lur_update_rec.ur_master_transno;
153         }
154
155         new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
156
157         spin_lock_init(&new->dtrq_sub_list_lock);
158         INIT_LIST_HEAD(&new->dtrq_sub_list);
159         INIT_LIST_HEAD(&new->dtrq_list);
160
161         RETURN(new);
162 }
163
164 /**
165  * Lookup distribute sub replay
166  *
167  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
168  * mdt_index.
169  *
170  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
171  * \param[in] mdt_index                 the mdt_index as the key of lookup
172  *
173  * \retval              the pointer of sub replay if it can be found.
174  * \retval              NULL if it can not find.
175  */
176 struct distribute_txn_replay_req_sub *
177 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
178 {
179         struct distribute_txn_replay_req_sub *dtrqs = NULL;
180         struct distribute_txn_replay_req_sub *tmp;
181
182         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
183                 if (tmp->dtrqs_mdt_index == mdt_index) {
184                         dtrqs = tmp;
185                         break;
186                 }
187         }
188         return dtrqs;
189 }
190
191 /**
192  * Try to add cookie to sub distribute txn request
193  *
194  * Check if the update log cookie has been added to the request, if not,
195  * add it to the dtrqs_cookie_list.
196  *
197  * \param[in] dtrqs     sub replay req where cookies to be added.
198  * \param[in] cookie    cookie to be added.
199  *
200  * \retval              0 if the cookie is adding succeeds.
201  * \retval              negative errno if adding fails.
202  */
203 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
204                                struct llog_cookie *cookie)
205 {
206         struct sub_thandle_cookie *new;
207
208         OBD_ALLOC_PTR(new);
209         if (new == NULL)
210                 return -ENOMEM;
211
212         INIT_LIST_HEAD(&new->stc_list);
213         new->stc_cookie = *cookie;
214         /* Note: only single thread will access one sub_request each time,
215          * so no need lock here */
216         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
217
218         return 0;
219 }
220
221 /**
222  * Insert distribute txn sub req replay
223  *
224  * Allocate sub replay req and insert distribute txn replay list.
225  *
226  * \param[in] dtrq      d to be added
227  * \param[in] cookie    the cookie of the update record
228  * \param[in] mdt_index the mdt_index of the update record
229  *
230  * \retval              0 if the adding succeeds.
231  * \retval              negative errno if the adding fails.
232  */
233 static int
234 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
235                            struct llog_cookie *cookie,
236                            __u32 mdt_index)
237 {
238         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
239         struct distribute_txn_replay_req_sub    *new;
240         int                                     rc;
241         ENTRY;
242
243         spin_lock(&dtrq->dtrq_sub_list_lock);
244         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
245         spin_unlock(&dtrq->dtrq_sub_list_lock);
246         if (dtrqs != NULL) {
247                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
248                 RETURN(0);
249         }
250
251         OBD_ALLOC_PTR(new);
252         if (new == NULL)
253                 RETURN(-ENOMEM);
254
255         INIT_LIST_HEAD(&new->dtrqs_list);
256         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
257         new->dtrqs_mdt_index = mdt_index;
258         spin_lock(&dtrq->dtrq_sub_list_lock);
259         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
260         if (dtrqs == NULL) {
261                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
262                 dtrqs = new;
263         } else {
264                 OBD_FREE_PTR(new);
265         }
266         spin_unlock(&dtrq->dtrq_sub_list_lock);
267
268         rc = dtrq_sub_add_cookie(dtrqs, cookie);
269
270         RETURN(rc);
271 }
272
273 /**
274  * append updates to the current replay updates
275  *
276  * Append more updates to the existent replay update. And this is only
277  * used when combining mulitple updates into one large updates during
278  * replay.
279  *
280  * \param[in] dtrq      the update replay request where the new update
281  *                      records will be added.
282  * \param[in] lur       the new update record.
283  *
284  * \retval              0 if appending succeeds.
285  * \retval              negative errno if appending fails.
286  */
287 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
288                                struct update_records *record)
289 {
290         struct llog_update_record *new_lur;
291         size_t lur_size = dtrq->dtrq_lur_size;
292         void *ptr;
293         ENTRY;
294
295         /* Because several threads might retrieve the same records from
296          * different targets, and we only need one copy of records. So
297          * we will check if the records is in the next one, if not, just
298          * skip it */
299         spin_lock(&dtrq->dtrq_sub_list_lock);
300         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
301                 spin_unlock(&dtrq->dtrq_sub_list_lock);
302                 RETURN(0);
303         }
304         dtrq->dtrq_lur->lur_update_rec.ur_index++;
305         spin_unlock(&dtrq->dtrq_sub_list_lock);
306
307         lur_size += update_records_size(record);
308         OBD_ALLOC_LARGE(new_lur, lur_size);
309         if (new_lur == NULL) {
310                 spin_lock(&dtrq->dtrq_sub_list_lock);
311                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
312                 spin_unlock(&dtrq->dtrq_sub_list_lock);
313                 RETURN(-ENOMEM);
314         }
315
316         /* Copy the old and new records to the new allocated buffer */
317         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
318         ptr = (char *)&new_lur->lur_update_rec +
319                 update_records_size(&new_lur->lur_update_rec);
320         memcpy(ptr, &record->ur_ops,
321                update_records_size(record) -
322                offsetof(struct update_records, ur_ops));
323
324         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
325         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
326         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
327
328         /* Replace the records */
329         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
330         dtrq->dtrq_lur = new_lur;
331         dtrq->dtrq_lur_size = lur_size;
332         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
333         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
334         RETURN(0);
335 }
336
337 /**
338  * Insert update records to the replay list.
339  *
340  * Allocate distribute txn replay req and insert it into the replay
341  * list, then insert the update records into the replay req.
342  *
343  * \param[in] tdtd      distribute txn replay data where the replay list
344  *                      is.
345  * \param[in] record    the update record
346  * \param[in] cookie    cookie of the record
347  * \param[in] index     mdt index of the record
348  *
349  * \retval              0 if the adding succeeds.
350  * \retval              negative errno if the adding fails.
351  */
352 int
353 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
354                                      struct llog_update_record *lur,
355                                      struct llog_cookie *cookie,
356                                      __u32 mdt_index)
357 {
358         struct distribute_txn_replay_req *dtrq;
359         struct update_records *record = &lur->lur_update_rec;
360         bool replace_record = false;
361         int rc = 0;
362         ENTRY;
363
364         CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
365                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
366                record->ur_batchid, record->ur_master_transno, mdt_index);
367
368         /* Update batchid if necessary */
369         spin_lock(&tdtd->tdtd_batchid_lock);
370         if (record->ur_batchid >= tdtd->tdtd_batchid) {
371                 CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
372                        tdtd->tdtd_lut->lut_obd->obd_name,
373                        tdtd->tdtd_batchid, record->ur_batchid);
374                 tdtd->tdtd_batchid = record->ur_batchid + 1;
375         }
376         spin_unlock(&tdtd->tdtd_batchid_lock);
377
378 again:
379         spin_lock(&tdtd->tdtd_replay_list_lock);
380         /* First try to build the replay update request with the records */
381         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
382         if (dtrq == NULL) {
383                 spin_unlock(&tdtd->tdtd_replay_list_lock);
384                 dtrq = dtrq_create(tdtd, lur);
385                 if (IS_ERR(dtrq))
386                         RETURN(PTR_ERR(dtrq));
387
388                 spin_lock(&tdtd->tdtd_replay_list_lock);
389                 rc = dtrq_insert(tdtd, dtrq);
390                 if (rc < 0) {
391                         spin_unlock(&tdtd->tdtd_replay_list_lock);
392                         dtrq_destroy(dtrq);
393                         if (rc == -EEXIST)
394                                 goto again;
395                         return rc;
396                 }
397         } else {
398                 /* If the master transno in update header is not
399                 * matched with the one in the record, then it means
400                 * the dtrq is originally created by master record,
401                 * so we need update master transno and reposition
402                 * the dtrq(by master transno) in the list and also
403                 * replace update record */
404                 if (record->ur_master_transno != 0 &&
405                     dtrq->dtrq_master_transno != record->ur_master_transno &&
406                     dtrq->dtrq_lur != NULL) {
407                         list_del_init(&dtrq->dtrq_list);
408                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
409                                                 record->ur_master_transno;
410
411                         dtrq->dtrq_master_transno = record->ur_master_transno;
412                         replace_record = true;
413                         /* try to insert again */
414                         rc = dtrq_insert(tdtd, dtrq);
415                         if (rc < 0) {
416                                 spin_unlock(&tdtd->tdtd_replay_list_lock);
417                                 dtrq_destroy(dtrq);
418                                 return rc;
419                         }
420                 }
421         }
422         spin_unlock(&tdtd->tdtd_replay_list_lock);
423
424         /* Because there should be only thread access the update record, so
425          * we do not need lock here */
426         if (replace_record) {
427                 /* Replace the update record and master transno */
428                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
429                 dtrq->dtrq_lur = NULL;
430                 dtrq->dtrq_lur_size = llog_update_record_size(lur);
431                 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
432                 if (dtrq->dtrq_lur == NULL)
433                         return -ENOMEM;
434
435                 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
436         }
437
438         /* This is a partial update records, let's try to append
439          * the record to the current replay request */
440         if (record->ur_flags & UPDATE_RECORD_CONTINUE)
441                 rc = dtrq_append_updates(dtrq, record);
442
443         /* Then create and add sub update request */
444         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
445
446         RETURN(rc);
447 }
448 EXPORT_SYMBOL(insert_update_records_to_replay_list);
449
450 /**
451  * Dump updates of distribute txns.
452  *
453  * Output all of recovery updates in the distribute txn list to the
454  * debug log.
455  *
456  * \param[in] tdtd      distribute txn data where all of distribute txn
457  *                      are listed.
458  * \param[in] mask      debug mask
459  */
460 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
461 {
462         struct distribute_txn_replay_req *dtrq;
463
464         spin_lock(&tdtd->tdtd_replay_list_lock);
465         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
466                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
467                                     false);
468         spin_unlock(&tdtd->tdtd_replay_list_lock);
469 }
470 EXPORT_SYMBOL(dtrq_list_dump);
471
472 /**
473  * Destroy distribute txn replay req
474  *
475  * Destroy distribute txn replay req and all of subs.
476  *
477  * \param[in] dtrq      distribute txn replqy req to be destroyed.
478  */
479 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
480 {
481         struct distribute_txn_replay_req_sub    *dtrqs;
482         struct distribute_txn_replay_req_sub    *tmp;
483
484         LASSERT(list_empty(&dtrq->dtrq_list));
485         CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
486                dtrq->dtrq_master_transno);
487         spin_lock(&dtrq->dtrq_sub_list_lock);
488         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
489                 struct sub_thandle_cookie *stc;
490                 struct sub_thandle_cookie *tmp;
491
492                 list_del(&dtrqs->dtrqs_list);
493                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
494                                          stc_list) {
495                         list_del(&stc->stc_list);
496                         OBD_FREE_PTR(stc);
497                 }
498                 OBD_FREE_PTR(dtrqs);
499         }
500         spin_unlock(&dtrq->dtrq_sub_list_lock);
501
502         if (dtrq->dtrq_lur != NULL)
503                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
504
505         OBD_FREE_PTR(dtrq);
506 }
507 EXPORT_SYMBOL(dtrq_destroy);
508
509 /**
510  * Destroy all of replay req.
511  *
512  * Destroy all of replay req in the replay list.
513  *
514  * \param[in] tdtd      target distribute txn data where the replay list is.
515  */
516 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
517 {
518         struct distribute_txn_replay_req *dtrq;
519         struct distribute_txn_replay_req *tmp;
520
521         spin_lock(&tdtd->tdtd_replay_list_lock);
522         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
523                                  dtrq_list) {
524                 list_del_init(&dtrq->dtrq_list);
525                 dtrq_destroy(dtrq);
526         }
527         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
528                                  dtrq_list) {
529                 list_del_init(&dtrq->dtrq_list);
530                 dtrq_destroy(dtrq);
531         }
532         spin_unlock(&tdtd->tdtd_replay_list_lock);
533 }
534 EXPORT_SYMBOL(dtrq_list_destroy);
535
536 /**
537  * Get next req in the replay list
538  *
539  * Get next req needs to be replayed, since it is a sorted list
540  * (by master MDT transno)
541  *
542  * \param[in] tdtd      distribute txn data where the replay list is
543  *
544  * \retval              the pointer of update recovery header
545  */
546 struct distribute_txn_replay_req *
547 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
548 {
549         struct distribute_txn_replay_req *dtrq = NULL;
550
551         spin_lock(&tdtd->tdtd_replay_list_lock);
552         if (!list_empty(&tdtd->tdtd_replay_list)) {
553                 dtrq = list_first_entry(&tdtd->tdtd_replay_list,
554                                         struct distribute_txn_replay_req,
555                                         dtrq_list);
556                 list_del_init(&dtrq->dtrq_list);
557         }
558         spin_unlock(&tdtd->tdtd_replay_list_lock);
559
560         return dtrq;
561 }
562 EXPORT_SYMBOL(distribute_txn_get_next_req);
563
564 /**
565  * Get next transno in the replay list, because this is the sorted
566  * list, so it will return the transno of next req in the list.
567  *
568  * \param[in] tdtd      distribute txn data where the replay list is
569  *
570  * \retval              the transno of next update in the list
571  */
572 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
573 {
574         struct distribute_txn_replay_req        *dtrq = NULL;
575         __u64                                   transno = 0;
576
577         spin_lock(&tdtd->tdtd_replay_list_lock);
578         if (!list_empty(&tdtd->tdtd_replay_list)) {
579                 dtrq = list_first_entry(&tdtd->tdtd_replay_list,
580                                         struct distribute_txn_replay_req,
581                                         dtrq_list);
582                 transno = dtrq->dtrq_master_transno;
583         }
584         spin_unlock(&tdtd->tdtd_replay_list_lock);
585
586         CDEBUG(D_HA, "%s: Next update transno %llu\n",
587                tdtd->tdtd_lut->lut_obd->obd_name, transno);
588         return transno;
589 }
590 EXPORT_SYMBOL(distribute_txn_get_next_transno);
591
592 struct distribute_txn_replay_req *
593 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
594                                   __u64 transno)
595 {
596         struct distribute_txn_replay_req *dtrq = NULL;
597         struct distribute_txn_replay_req *iter;
598
599         spin_lock(&tdtd->tdtd_replay_list_lock);
600         list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
601                 if (iter->dtrq_master_transno == transno) {
602                         dtrq = iter;
603                         break;
604                 }
605         }
606         spin_unlock(&tdtd->tdtd_replay_list_lock);
607         return dtrq;
608 }
609
610 bool is_req_replayed_by_update(struct ptlrpc_request *req)
611 {
612         struct lu_target *tgt = class_exp2tgt(req->rq_export);
613         struct distribute_txn_replay_req *dtrq;
614
615         if (tgt->lut_tdtd == NULL)
616                 return false;
617
618         dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
619                                         lustre_msg_get_transno(req->rq_reqmsg));
620         if (dtrq == NULL)
621                 return false;
622
623         return true;
624 }
625 EXPORT_SYMBOL(is_req_replayed_by_update);
626
627 /**
628  * Check if the update of one object is committed
629  *
630  * Check whether the update for the object is committed by checking whether
631  * the correspondent sub exists in the replay req. If it is committed, mark
632  * the committed flag in correspondent the sub thandle.
633  *
634  * \param[in] env       execution environment
635  * \param[in] dtrq      replay request
636  * \param[in] dt_obj    object for the update
637  * \param[in] top_th    top thandle
638  * \param[in] sub_th    sub thandle which the update belongs to
639  *
640  * \retval              1 if the update is not committed.
641  * \retval              0 if the update is committed.
642  * \retval              negative errno if some other failures happen.
643  */
644 static int update_is_committed(const struct lu_env *env,
645                                struct distribute_txn_replay_req *dtrq,
646                                struct dt_object *dt_obj,
647                                struct top_thandle *top_th,
648                                struct sub_thandle *st)
649 {
650         struct seq_server_site  *seq_site;
651         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
652         struct distribute_txn_replay_req_sub    *dtrqs;
653         __u32                   mdt_index;
654         ENTRY;
655
656         if (st->st_sub_th != NULL)
657                 RETURN(1);
658
659         if (st->st_committed)
660                 RETURN(0);
661
662         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
663         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
664                 mdt_index = fid_oid(fid);
665         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
666                 mdt_index = seq_site->ss_node_id;
667         } else {
668                 struct lu_server_fld *fld;
669                 struct lu_seq_range range = {0};
670                 int rc;
671
672                 fld = seq_site->ss_server_fld;
673                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
674                 LASSERT(fld->lsf_seq_lookup != NULL);
675                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
676                                          &range);
677                 if (rc < 0)
678                         RETURN(rc);
679                 mdt_index = range.lsr_index;
680         }
681
682         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
683         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
684                 st->st_committed = 1;
685                 if (dtrqs != NULL) {
686                         struct sub_thandle_cookie *stc;
687                         struct sub_thandle_cookie *tmp;
688
689                         list_for_each_entry_safe(stc, tmp,
690                                                  &dtrqs->dtrqs_cookie_list,
691                                                  stc_list)
692                                 list_move(&stc->stc_list, &st->st_cookie_list);
693                 }
694                 RETURN(0);
695         }
696
697         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
698                mdt_index);
699
700         RETURN(1);
701 }
702
703 /**
704  * Implementation of different update methods for update recovery.
705  *
706  * These following functions update_recovery_$(update_name) implement
707  * different updates recovery methods. They will extract the parameters
708  * from the common parameters area and call correspondent dt API to redo
709  * the update.
710  *
711  * \param[in] env       execution environment
712  * \param[in] op        update operation to be replayed
713  * \param[in] params    common update parameters which holds all parameters
714  *                      of the operation
715  * \param[in] th        transaction handle
716  * \param[in] declare   indicate it will do declare or real execution, true
717  *                      means declare, false means real execution
718  *
719  * \retval              0 if it succeeds.
720  * \retval              negative errno if it fails.
721  */
722 static int update_recovery_create(const struct lu_env *env,
723                                   struct dt_object *dt_obj,
724                                   const struct update_op *op,
725                                   const struct update_params *params,
726                                   struct thandle_exec_args *ta,
727                                   struct thandle *th)
728 {
729         struct update_thread_info *uti = update_env_info(env);
730         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
731         struct lu_attr          *attr = &uti->uti_attr;
732         struct obdo             *wobdo;
733         struct obdo             *lobdo = &uti->uti_obdo;
734         struct dt_object_format dof;
735         __u16                   size;
736         unsigned int            param_count;
737         int rc;
738         ENTRY;
739
740         if (dt_object_exists(dt_obj))
741                 RETURN(-EEXIST);
742
743         param_count = lur->lur_update_rec.ur_param_count;
744         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
745                                             param_count, &size);
746         if (wobdo == NULL)
747                 RETURN(-EIO);
748         if (size != sizeof(*wobdo))
749                 RETURN(-EIO);
750
751         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
752                 lustre_swab_obdo(wobdo);
753
754         lustre_get_wire_obdo(NULL, lobdo, wobdo);
755         la_from_obdo(attr, lobdo, lobdo->o_valid);
756
757         dof.dof_type = dt_mode_to_dft(attr->la_mode);
758
759         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
760                            ta, th, NULL, 0);
761
762         RETURN(rc);
763 }
764
765 static int update_recovery_destroy(const struct lu_env *env,
766                                    struct dt_object *dt_obj,
767                                    const struct update_op *op,
768                                    const struct update_params *params,
769                                    struct thandle_exec_args *ta,
770                                    struct thandle *th)
771 {
772         int rc;
773         ENTRY;
774
775         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
776
777         RETURN(rc);
778 }
779
780 static int update_recovery_ref_add(const struct lu_env *env,
781                                    struct dt_object *dt_obj,
782                                    const struct update_op *op,
783                                    const struct update_params *params,
784                                    struct thandle_exec_args *ta,
785                                    struct thandle *th)
786 {
787         int rc;
788         ENTRY;
789
790         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
791
792         RETURN(rc);
793 }
794
795 static int update_recovery_ref_del(const struct lu_env *env,
796                                    struct dt_object *dt_obj,
797                                    const struct update_op *op,
798                                    const struct update_params *params,
799                                    struct thandle_exec_args *ta,
800                                    struct thandle *th)
801 {
802         int rc;
803         ENTRY;
804
805         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
806
807         RETURN(rc);
808 }
809
810 static int update_recovery_attr_set(const struct lu_env *env,
811                                     struct dt_object *dt_obj,
812                                     const struct update_op *op,
813                                     const struct update_params *params,
814                                     struct thandle_exec_args *ta,
815                                     struct thandle *th)
816 {
817         struct update_thread_info *uti = update_env_info(env);
818         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
819         struct obdo     *wobdo;
820         struct obdo     *lobdo = &uti->uti_obdo;
821         struct lu_attr  *attr = &uti->uti_attr;
822         __u16           size;
823         unsigned int    param_count;
824         int             rc;
825         ENTRY;
826
827         param_count = lur->lur_update_rec.ur_param_count;
828         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
829                                             param_count, &size);
830         if (wobdo == NULL)
831                 RETURN(-EIO);
832         if (size != sizeof(*wobdo))
833                 RETURN(-EIO);
834
835         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
836                 lustre_swab_obdo(wobdo);
837
838         lustre_get_wire_obdo(NULL, lobdo, wobdo);
839         la_from_obdo(attr, lobdo, lobdo->o_valid);
840
841         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
842
843         RETURN(rc);
844 }
845
846 static int update_recovery_xattr_set(const struct lu_env *env,
847                                      struct dt_object *dt_obj,
848                                      const struct update_op *op,
849                                      const struct update_params *params,
850                                      struct thandle_exec_args *ta,
851                                      struct thandle *th)
852 {
853         struct update_thread_info *uti = update_env_info(env);
854         char            *buf;
855         char            *name;
856         int             fl;
857         __u16           size;
858         __u32           param_count;
859         int             rc;
860         ENTRY;
861
862         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
863         name = update_params_get_param_buf(params,
864                                            op->uop_params_off[0],
865                                            param_count, &size);
866         if (name == NULL)
867                 RETURN(-EIO);
868
869         buf = update_params_get_param_buf(params,
870                                           op->uop_params_off[1],
871                                           param_count, &size);
872         if (buf == NULL)
873                 RETURN(-EIO);
874
875         uti->uti_buf.lb_buf = buf;
876         uti->uti_buf.lb_len = (size_t)size;
877
878         buf = update_params_get_param_buf(params, op->uop_params_off[2],
879                                           param_count, &size);
880         if (buf == NULL)
881                 RETURN(-EIO);
882         if (size != sizeof(fl))
883                 RETURN(-EIO);
884
885         fl = le32_to_cpu(*(int *)buf);
886
887         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
888                               NULL, 0);
889
890         RETURN(rc);
891 }
892
893 static int update_recovery_index_insert(const struct lu_env *env,
894                                         struct dt_object *dt_obj,
895                                         const struct update_op *op,
896                                         const struct update_params *params,
897                                         struct thandle_exec_args *ta,
898                                         struct thandle *th)
899 {
900         struct update_thread_info *uti = update_env_info(env);
901         struct lu_fid           *fid;
902         char                    *name;
903         __u32                   param_count;
904         __u32                   *ptype;
905         __u32                   type;
906         __u16                   size;
907         int rc;
908         ENTRY;
909
910         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
911         name = update_params_get_param_buf(params, op->uop_params_off[0],
912                                            param_count, &size);
913         if (name == NULL)
914                 RETURN(-EIO);
915
916         fid = update_params_get_param_buf(params, op->uop_params_off[1],
917                                           param_count, &size);
918         if (fid == NULL)
919                 RETURN(-EIO);
920         if (size != sizeof(*fid))
921                 RETURN(-EIO);
922
923         fid_le_to_cpu(fid, fid);
924
925         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
926                                             param_count, &size);
927         if (ptype == NULL)
928                 RETURN(-EIO);
929         if (size != sizeof(*ptype))
930                 RETURN(-EIO);
931         type = le32_to_cpu(*ptype);
932
933         if (!dt_try_as_dir(env, dt_obj, false))
934                 RETURN(-ENOTDIR);
935
936         uti->uti_rec.rec_fid = fid;
937         uti->uti_rec.rec_type = type;
938
939         rc = out_tx_index_insert(env, dt_obj,
940                                  (const struct dt_rec *)&uti->uti_rec,
941                                  (const struct dt_key *)name, ta, th,
942                                  NULL, 0);
943
944         RETURN(rc);
945 }
946
947 static int update_recovery_index_delete(const struct lu_env *env,
948                                         struct dt_object *dt_obj,
949                                         const struct update_op *op,
950                                         const struct update_params *params,
951                                         struct thandle_exec_args *ta,
952                                         struct thandle *th)
953 {
954         struct update_thread_info *uti = update_env_info(env);
955         __u32   param_count;
956         char    *name;
957         __u16   size;
958         int     rc;
959         ENTRY;
960
961         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
962         name = update_params_get_param_buf(params, op->uop_params_off[0],
963                                            param_count, &size);
964         if (name == NULL)
965                 RETURN(-EIO);
966
967         if (!dt_try_as_dir(env, dt_obj, true))
968                 RETURN(-ENOTDIR);
969
970         rc = out_tx_index_delete(env, dt_obj,
971                                  (const struct dt_key *)name, ta, th, NULL, 0);
972
973         RETURN(rc);
974 }
975
976 static int update_recovery_write(const struct lu_env *env,
977                                  struct dt_object *dt_obj,
978                                  const struct update_op *op,
979                                  const struct update_params *params,
980                                  struct thandle_exec_args *ta,
981                                  struct thandle *th)
982 {
983         struct update_thread_info *uti = update_env_info(env);
984         char            *buf;
985         __u32           param_count;
986         __u64           pos;
987         __u16           size;
988         int rc;
989         ENTRY;
990
991         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
992         buf = update_params_get_param_buf(params, op->uop_params_off[0],
993                                           param_count, &size);
994         if (buf == NULL)
995                 RETURN(-EIO);
996
997         uti->uti_buf.lb_buf = buf;
998         uti->uti_buf.lb_len = size;
999
1000         buf = update_params_get_param_buf(params, op->uop_params_off[1],
1001                                           param_count, &size);
1002         if (buf == NULL)
1003                 RETURN(-EIO);
1004
1005         pos = le64_to_cpu(*(__u64 *)buf);
1006
1007         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1008                           ta, th, NULL, 0);
1009
1010         RETURN(rc);
1011 }
1012
1013 static int update_recovery_xattr_del(const struct lu_env *env,
1014                                      struct dt_object *dt_obj,
1015                                      const struct update_op *op,
1016                                      const struct update_params *params,
1017                                      struct thandle_exec_args *ta,
1018                                      struct thandle *th)
1019 {
1020         struct update_thread_info *uti = update_env_info(env);
1021         __u32   param_count;
1022         char    *name;
1023         __u16   size;
1024         int     rc;
1025         ENTRY;
1026
1027         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1028         name = update_params_get_param_buf(params, op->uop_params_off[0],
1029                                            param_count, &size);
1030         if (name == NULL)
1031                 RETURN(-EIO);
1032
1033         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1034
1035         RETURN(rc);
1036 }
1037
1038 /**
1039  * Update session information
1040  *
1041  * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1042  * can be called correctly during update replay.
1043  *
1044  * \param[in] env       execution environment.
1045  * \param[in] tdtd      distribute data structure of the recovering tgt.
1046  * \param[in] th        thandle of this update replay.
1047  * \param[in] master_th master sub thandle.
1048  * \param[in] ta_arg    the tx arg structure to hold the update for updating
1049  *                      reply data.
1050  */
1051 static void update_recovery_update_ses(struct lu_env *env,
1052                                       struct target_distribute_txn_data *tdtd,
1053                                       struct thandle *th,
1054                                       struct thandle *master_th,
1055                                       struct distribute_txn_replay_req *dtrq,
1056                                       struct tx_arg *ta_arg)
1057 {
1058         struct tgt_session_info *tsi;
1059         struct lu_target *lut = tdtd->tdtd_lut;
1060         struct lsd_reply_header *lrh = &lut->lut_reply_header;
1061         struct lsd_reply_data *lrd;
1062         struct top_thandle *top_th;
1063         struct obd_export *export;
1064         struct cfs_hash *hash;
1065         size_t size;
1066
1067         tsi = tgt_ses_info(env);
1068         if (tsi->tsi_exp != NULL)
1069                 return;
1070
1071         size = ta_arg->u.write.buf.lb_len;
1072         lrd = ta_arg->u.write.buf.lb_buf;
1073         if (size != lrh->lrh_reply_size || lrd == NULL)
1074                 return;
1075
1076         lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
1077         lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
1078         lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
1079         lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
1080         lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
1081
1082         CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
1083         if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1084                 return;
1085
1086         hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1087         if (hash == NULL)
1088                 return;
1089
1090         export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1091         if (export == NULL) {
1092                 cfs_hash_putref(hash);
1093                 return;
1094         }
1095
1096         tsi->tsi_exp = export;
1097         tsi->tsi_xid = lrd->lrd_xid;
1098         tsi->tsi_opdata = lrd->lrd_data;
1099         tsi->tsi_result = lrd->lrd_result;
1100         tsi->tsi_client_gen = lrd->lrd_client_gen;
1101         dtrq->dtrq_xid = lrd->lrd_xid;
1102         top_th = container_of(th, struct top_thandle, tt_super);
1103         top_th->tt_master_sub_thandle = master_th;
1104         cfs_hash_putref(hash);
1105 }
1106
1107 /**
1108  * Execute updates in the update replay records
1109  *
1110  * Declare distribute txn replay by update records and add the updates
1111  * to the execution list. Note: it will check if the update has been
1112  * committed, and only execute the updates if it is not committed to
1113  * disk.
1114  *
1115  * \param[in] env       execution environment
1116  * \param[in] tdtd      distribute txn replay data which hold all of replay
1117  *                      reqs and all replay parameters.
1118  * \param[in] dtrq      distribute transaction replay req.
1119  * \param[in] ta        thandle execute args.
1120  *
1121  * \retval              0 if declare succeeds.
1122  * \retval              negative errno if declare fails.
1123  */
1124 static int update_recovery_exec(const struct lu_env *env,
1125                                 struct target_distribute_txn_data *tdtd,
1126                                 struct distribute_txn_replay_req *dtrq,
1127                                 struct thandle_exec_args *ta)
1128 {
1129         struct llog_update_record *lur = dtrq->dtrq_lur;
1130         struct update_records   *records = &lur->lur_update_rec;
1131         struct update_ops       *ops = &records->ur_ops;
1132         struct update_params    *params = update_records_get_params(records);
1133         struct top_thandle      *top_th = container_of(ta->ta_handle,
1134                                                        struct top_thandle,
1135                                                        tt_super);
1136         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1137         struct update_op        *op;
1138         unsigned int            i;
1139         int                     rc = 0;
1140         ENTRY;
1141
1142         /* These records have been swabbed in llog_cat_process() */
1143         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1144              i++, op = update_op_next_op(op)) {
1145                 struct lu_fid           *fid = &op->uop_fid;
1146                 struct dt_object        *dt_obj;
1147                 struct dt_object        *sub_dt_obj;
1148                 struct dt_device        *sub_dt;
1149                 struct sub_thandle      *st;
1150
1151                 if (op->uop_type == OUT_NOOP)
1152                         continue;
1153
1154                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1155                 if (IS_ERR(dt_obj)) {
1156                         rc = PTR_ERR(dt_obj);
1157                         if (rc == -EREMCHG)
1158                                 LCONSOLE_WARN("%.16s: hit invalid OI mapping "
1159                                               "for "DFID" during recovering, "
1160                                               "that may because auto scrub is "
1161                                               "disabled on related MDT, and "
1162                                               "will cause recovery failure. "
1163                                               "Please enable auto scrub and "
1164                                               "retry the recovery.\n",
1165                                               tdtd->tdtd_lut->lut_obd->obd_name,
1166                                               PFID(fid));
1167
1168                         break;
1169                 }
1170                 sub_dt_obj = dt_object_child(dt_obj);
1171
1172                 /* Create sub thandle if not */
1173                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1174                 st = lookup_sub_thandle(tmt, sub_dt);
1175                 if (st == NULL) {
1176                         st = create_sub_thandle(tmt, sub_dt);
1177                         if (IS_ERR(st))
1178                                 GOTO(next, rc = PTR_ERR(st));
1179                 }
1180
1181                 /* check if updates on the OSD/OSP are committed */
1182                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1183                 if (rc == 0)
1184                         /* If this is committed, goto next */
1185                         goto next;
1186
1187                 if (rc < 0)
1188                         GOTO(next, rc);
1189
1190                 /* Create thandle for sub thandle if needed */
1191                 if (st->st_sub_th == NULL) {
1192                         rc = sub_thandle_trans_create(env, top_th, st);
1193                         if (rc != 0)
1194                                 GOTO(next, rc);
1195                 }
1196
1197                 CDEBUG(D_HA, "replay %uth update\n", i);
1198                 switch (op->uop_type) {
1199                 case OUT_CREATE:
1200                         rc = update_recovery_create(env, sub_dt_obj,
1201                                                     op, params, ta,
1202                                                     st->st_sub_th);
1203                         break;
1204                 case OUT_DESTROY:
1205                         rc = update_recovery_destroy(env, sub_dt_obj,
1206                                                      op, params, ta,
1207                                                      st->st_sub_th);
1208                         break;
1209                 case OUT_REF_ADD:
1210                         rc = update_recovery_ref_add(env, sub_dt_obj,
1211                                                      op, params, ta,
1212                                                      st->st_sub_th);
1213                         break;
1214                 case OUT_REF_DEL:
1215                         rc = update_recovery_ref_del(env, sub_dt_obj,
1216                                                      op, params, ta,
1217                                                      st->st_sub_th);
1218                         break;
1219                 case OUT_ATTR_SET:
1220                         rc = update_recovery_attr_set(env, sub_dt_obj,
1221                                                       op, params, ta,
1222                                                       st->st_sub_th);
1223                         break;
1224                 case OUT_XATTR_SET:
1225                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1226                                                        op, params, ta,
1227                                                        st->st_sub_th);
1228                         break;
1229                 case OUT_INDEX_INSERT:
1230                         rc = update_recovery_index_insert(env, sub_dt_obj,
1231                                                           op, params, ta,
1232                                                           st->st_sub_th);
1233                         break;
1234                 case OUT_INDEX_DELETE:
1235                         rc = update_recovery_index_delete(env, sub_dt_obj,
1236                                                           op, params, ta,
1237                                                           st->st_sub_th);
1238                         break;
1239                 case OUT_WRITE:
1240                         rc = update_recovery_write(env, sub_dt_obj,
1241                                                    op, params, ta,
1242                                                    st->st_sub_th);
1243                         break;
1244                 case OUT_XATTR_DEL:
1245                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1246                                                        op, params, ta,
1247                                                        st->st_sub_th);
1248                         break;
1249                 default:
1250                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1251                         rc = -EINVAL;
1252                         break;
1253                 }
1254 next:
1255                 dt_object_put(env, dt_obj);
1256                 if (rc < 0)
1257                         break;
1258         }
1259
1260         ta->ta_handle->th_result = rc;
1261         RETURN(rc);
1262 }
1263
1264 /**
1265  * redo updates on MDT if needed.
1266  *
1267  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1268  * this function to replay distribute txn updates on all MDTs. It only replay
1269  * updates on the MDT where the update record is missing.
1270  *
1271  * If the update already exists on the MDT, then it does not need replay the
1272  * updates on that MDT, and only mark the sub transaction has been committed
1273  * there.
1274  *
1275  * \param[in] env       execution environment
1276  * \param[in] tdtd      target distribute txn data, which holds the replay list
1277  *                      and all parameters needed by replay process.
1278  * \param[in] dtrq      distribute txn replay req.
1279  *
1280  * \retval              0 if replay succeeds.
1281  * \retval              negative errno if replay failes.
1282  */
1283 int distribute_txn_replay_handle(struct lu_env *env,
1284                                  struct target_distribute_txn_data *tdtd,
1285                                  struct distribute_txn_replay_req *dtrq)
1286 {
1287         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1288         struct thandle_exec_args *ta;
1289         struct lu_context       session_env;
1290         struct thandle          *th = NULL;
1291         struct top_thandle      *top_th;
1292         struct top_multiple_thandle *tmt;
1293         struct thandle_update_records *tur = NULL;
1294         int                     i;
1295         int                     rc = 0;
1296         ENTRY;
1297
1298         /* initialize session, it is needed for the handler of target */
1299         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1300         if (rc) {
1301                 CERROR("%s: failure to initialize session: rc = %d\n",
1302                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1303                 RETURN(rc);
1304         }
1305         lu_context_enter(&session_env);
1306         env->le_ses = &session_env;
1307         lu_env_refill(env);
1308         update_records_dump(records, D_HA, true);
1309         th = top_trans_create(env, NULL);
1310         if (IS_ERR(th))
1311                 GOTO(exit_session, rc = PTR_ERR(th));
1312
1313         ta = &update_env_info(env)->uti_tea;
1314         ta->ta_argno = 0;
1315
1316         update_env_info(env)->uti_dtrq = dtrq;
1317         /* Create distribute transaction structure for this top thandle */
1318         top_th = container_of(th, struct top_thandle, tt_super);
1319         rc = top_trans_create_tmt(env, top_th);
1320         if (rc < 0)
1321                 GOTO(stop_trans, rc);
1322
1323         th->th_dev = tdtd->tdtd_dt;
1324         ta->ta_handle = th;
1325
1326         /* check if the distribute transaction has been committed */
1327         tmt = top_th->tt_multiple_thandle;
1328         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1329         tmt->tmt_batchid = dtrq->dtrq_batchid;
1330         tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1331
1332         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1333                 tmt->tmt_committed = 1;
1334
1335         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1336         if (rc < 0)
1337                 GOTO(stop_trans, rc);
1338
1339         /* If no updates are needed to be replayed, then mark this records as
1340          * committed, so commit thread distribute_txn_commit_thread() will
1341          * delete the record */
1342         if (ta->ta_argno == 0)
1343                 tmt->tmt_committed = 1;
1344
1345         tur = &update_env_info(env)->uti_tur;
1346         tur->tur_update_records = dtrq->dtrq_lur;
1347         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1348         tur->tur_update_params = NULL;
1349         tur->tur_update_param_count = 0;
1350         tmt->tmt_update_records = tur;
1351
1352         distribute_txn_insert_by_batchid(tmt);
1353         rc = top_trans_start(env, NULL, th);
1354         if (rc < 0)
1355                 GOTO(stop_trans, rc);
1356
1357         for (i = 0; i < ta->ta_argno; i++) {
1358                 struct tx_arg           *ta_arg;
1359                 struct dt_object        *dt_obj;
1360                 struct dt_device        *sub_dt;
1361                 struct sub_thandle      *st;
1362
1363                 ta_arg = ta->ta_args[i];
1364                 dt_obj = ta_arg->object;
1365
1366                 LASSERT(tmt->tmt_committed == 0);
1367                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1368                 st = lookup_sub_thandle(tmt, sub_dt);
1369
1370                 LASSERT(st != NULL);
1371                 LASSERT(st->st_sub_th != NULL);
1372                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1373                                              ta->ta_args[i]);
1374
1375                 /* If the update is to update the reply data, then
1376                  * we need set the session information, so
1377                  * tgt_last_rcvd_update() can be called correctly */
1378                 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1379                         update_recovery_update_ses(env, tdtd, th,
1380                                                    st->st_sub_th, dtrq, ta_arg);
1381
1382                 if (unlikely(rc < 0)) {
1383                         CDEBUG(D_HA, "error during execution of #%u from"
1384                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1385                                ta->ta_args[i]->line, rc);
1386                         while (--i > 0) {
1387                                 if (ta->ta_args[i]->undo_fn != NULL) {
1388                                         dt_obj = ta->ta_args[i]->object;
1389                                         sub_dt =
1390                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1391                                         st = lookup_sub_thandle(tmt, sub_dt);
1392                                         LASSERT(st != NULL);
1393                                         LASSERT(st->st_sub_th != NULL);
1394
1395                                         ta->ta_args[i]->undo_fn(env,
1396                                                                st->st_sub_th,
1397                                                                ta->ta_args[i]);
1398                                 } else {
1399                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1400                                              dt_obd_name(ta->ta_handle->th_dev),
1401                                                ta->ta_args[i]->file,
1402                                                ta->ta_args[i]->line, -ENOTSUPP);
1403                                 }
1404                         }
1405                         break;
1406                 }
1407                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1408                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1409         }
1410
1411 stop_trans:
1412         if (rc < 0)
1413                 th->th_result = rc;
1414         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1415         for (i = 0; i < ta->ta_argno; i++) {
1416                 if (ta->ta_args[i]->object != NULL) {
1417                         dt_object_put(env, ta->ta_args[i]->object);
1418                         ta->ta_args[i]->object = NULL;
1419                 }
1420         }
1421
1422         if (tur != NULL)
1423                 tur->tur_update_records = NULL;
1424
1425         if (tgt_ses_info(env)->tsi_exp != NULL) {
1426                 class_export_put(tgt_ses_info(env)->tsi_exp);
1427                 tgt_ses_info(env)->tsi_exp = NULL;
1428         }
1429 exit_session:
1430         lu_context_exit(&session_env);
1431         lu_context_fini(&session_env);
1432         RETURN(rc);
1433 }
1434 EXPORT_SYMBOL(distribute_txn_replay_handle);