Whamcloud - gitweb
LU-6880 update: after reply move dtrq to finish list
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <md_object.h>
48 #include <lustre_update.h>
49 #include <obd.h>
50 #include <obd_class.h>
51 #include "tgt_internal.h"
52
53 /**
54  * Lookup distribute_txn_replay req
55  *
56  * Lookup distribute_txn_replay in the replay list by batchid.
57  * It is assumed the list has been locked before calling this function.
58  *
59  * \param[in] tdtd      distribute_txn_data, which holds the replay
60  *                      list.
61  * \param[in] batchid   batchid used by lookup.
62  *
63  * \retval              pointer of the replay if succeeds.
64  * \retval              NULL if can not find it.
65  */
66 static struct distribute_txn_replay_req *
67 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
68 {
69         struct distribute_txn_replay_req        *tmp;
70         struct distribute_txn_replay_req        *dtrq = NULL;
71
72         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
73                 if (tmp->dtrq_batchid == batchid) {
74                         dtrq = tmp;
75                         break;
76                 }
77         }
78         return dtrq;
79 }
80
81 /**
82  * insert distribute txn replay req
83  *
84  * Insert distribute txn replay to the replay list, and it assumes the
85  * list has been looked. Note: the replay list is a sorted list, which
86  * is sorted by master transno. It is assumed the replay list has been
87  * locked before calling this function.
88  *
89  * \param[in] tdtd      target distribute txn data where replay list is
90  * \param[in] new       distribute txn replay to be inserted
91  *
92  * \retval              0 if insertion succeeds
93  * \retval              EEXIST if the dtrq already exists
94  */
95 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
96                         struct distribute_txn_replay_req *new)
97 {
98         struct distribute_txn_replay_req *iter;
99
100         /* Check if the dtrq has been added to the list */
101         iter = dtrq_lookup(tdtd, new->dtrq_batchid);
102         if (iter != NULL)
103                 return -EEXIST;
104
105         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
106                 if (iter->dtrq_master_transno > new->dtrq_master_transno)
107                         continue;
108
109                 /* If there are mulitple replay req with same transno, then
110                  * sort them with batchid */
111                 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
112                     iter->dtrq_batchid > new->dtrq_batchid)
113                         continue;
114
115                 list_add(&new->dtrq_list, &iter->dtrq_list);
116                 break;
117         }
118
119         if (list_empty(&new->dtrq_list))
120                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
121
122         return 0;
123 }
124
125 /**
126  * create distribute txn replay req
127  *
128  * Allocate distribute txn replay req according to the update records.
129  *
130  * \param[in] tdtd      target distribute txn data where replay list is.
131  * \param[in] record    update records from the update log.
132  *
133  * \retval              the pointer of distribute txn replay req if
134  *                      the creation succeeds.
135  * \retval              NULL if the creation fails.
136  */
137 static struct distribute_txn_replay_req *
138 dtrq_create(struct target_distribute_txn_data *tdtd,
139             struct llog_update_record *lur)
140 {
141         struct distribute_txn_replay_req *new;
142
143         OBD_ALLOC_PTR(new);
144         if (new == NULL)
145                 RETURN(ERR_PTR(-ENOMEM));
146
147         new->dtrq_lur_size = llog_update_record_size(lur);
148         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
149         if (new->dtrq_lur == NULL) {
150                 OBD_FREE_PTR(new);
151                 RETURN(ERR_PTR(-ENOMEM));
152         }
153
154         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
155
156         /* If the transno in the update record is 0, it means the
157          * update are from master MDT, and it will use the master
158          * last committed transno as its master transno. Later, if
159          * the update records are gotten from slave MDTs, then these
160          * transno will be replaced.
161          * See insert_update_records_to_replay_list(). */
162         if (lur->lur_update_rec.ur_master_transno == 0) {
163                 new->dtrq_lur->lur_update_rec.ur_master_transno =
164                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
165                 new->dtrq_master_transno =
166                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
167         } else {
168                 new->dtrq_master_transno =
169                                 lur->lur_update_rec.ur_master_transno;
170         }
171
172         new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
173
174         spin_lock_init(&new->dtrq_sub_list_lock);
175         INIT_LIST_HEAD(&new->dtrq_sub_list);
176         INIT_LIST_HEAD(&new->dtrq_list);
177
178         RETURN(new);
179 }
180
181 /**
182  * Lookup distribute sub replay
183  *
184  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
185  * mdt_index.
186  *
187  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
188  * \param[in] mdt_index                 the mdt_index as the key of lookup
189  *
190  * \retval              the pointer of sub replay if it can be found.
191  * \retval              NULL if it can not find.
192  */
193 struct distribute_txn_replay_req_sub *
194 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
195 {
196         struct distribute_txn_replay_req_sub *dtrqs = NULL;
197         struct distribute_txn_replay_req_sub *tmp;
198
199         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
200                 if (tmp->dtrqs_mdt_index == mdt_index) {
201                         dtrqs = tmp;
202                         break;
203                 }
204         }
205         return dtrqs;
206 }
207
208 /**
209  * Try to add cookie to sub distribute txn request
210  *
211  * Check if the update log cookie has been added to the request, if not,
212  * add it to the dtrqs_cookie_list.
213  *
214  * \param[in] dtrqs     sub replay req where cookies to be added.
215  * \param[in] cookie    cookie to be added.
216  *
217  * \retval              0 if the cookie is adding succeeds.
218  * \retval              negative errno if adding fails.
219  */
220 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
221                                struct llog_cookie *cookie)
222 {
223         struct sub_thandle_cookie *new;
224
225         OBD_ALLOC_PTR(new);
226         if (new == NULL)
227                 return -ENOMEM;
228
229         INIT_LIST_HEAD(&new->stc_list);
230         new->stc_cookie = *cookie;
231         /* Note: only single thread will access one sub_request each time,
232          * so no need lock here */
233         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
234
235         return 0;
236 }
237
238 /**
239  * Insert distribute txn sub req replay
240  *
241  * Allocate sub replay req and insert distribute txn replay list.
242  *
243  * \param[in] dtrq      d to be added
244  * \param[in] cookie    the cookie of the update record
245  * \param[in] mdt_index the mdt_index of the update record
246  *
247  * \retval              0 if the adding succeeds.
248  * \retval              negative errno if the adding fails.
249  */
250 static int
251 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
252                            struct llog_cookie *cookie,
253                            __u32 mdt_index)
254 {
255         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
256         struct distribute_txn_replay_req_sub    *new;
257         int                                     rc;
258         ENTRY;
259
260         spin_lock(&dtrq->dtrq_sub_list_lock);
261         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
262         spin_unlock(&dtrq->dtrq_sub_list_lock);
263         if (dtrqs != NULL) {
264                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
265                 RETURN(0);
266         }
267
268         OBD_ALLOC_PTR(new);
269         if (new == NULL)
270                 RETURN(-ENOMEM);
271
272         INIT_LIST_HEAD(&new->dtrqs_list);
273         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
274         new->dtrqs_mdt_index = mdt_index;
275         spin_lock(&dtrq->dtrq_sub_list_lock);
276         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
277         if (dtrqs == NULL) {
278                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
279                 dtrqs = new;
280         } else {
281                 OBD_FREE_PTR(new);
282         }
283         spin_unlock(&dtrq->dtrq_sub_list_lock);
284
285         rc = dtrq_sub_add_cookie(dtrqs, cookie);
286
287         RETURN(rc);
288 }
289
290 /**
291  * append updates to the current replay updates
292  *
293  * Append more updates to the existent replay update. And this is only
294  * used when combining mulitple updates into one large updates during
295  * replay.
296  *
297  * \param[in] dtrq      the update replay request where the new update
298  *                      records will be added.
299  * \param[in] lur       the new update record.
300  *
301  * \retval              0 if appending succeeds.
302  * \retval              negative errno if appending fails.
303  */
304 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
305                                struct update_records *record)
306 {
307         struct llog_update_record *new_lur;
308         size_t lur_size = dtrq->dtrq_lur_size;
309         void *ptr;
310         ENTRY;
311
312         /* Because several threads might retrieve the same records from
313          * different targets, and we only need one copy of records. So
314          * we will check if the records is in the next one, if not, just
315          * skip it */
316         spin_lock(&dtrq->dtrq_sub_list_lock);
317         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
318                 spin_unlock(&dtrq->dtrq_sub_list_lock);
319                 RETURN(0);
320         }
321         dtrq->dtrq_lur->lur_update_rec.ur_index++;
322         spin_unlock(&dtrq->dtrq_sub_list_lock);
323
324         lur_size += update_records_size(record);
325         OBD_ALLOC_LARGE(new_lur, lur_size);
326         if (new_lur == NULL) {
327                 spin_lock(&dtrq->dtrq_sub_list_lock);
328                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
329                 spin_unlock(&dtrq->dtrq_sub_list_lock);
330                 RETURN(-ENOMEM);
331         }
332
333         /* Copy the old and new records to the new allocated buffer */
334         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
335         ptr = (char *)&new_lur->lur_update_rec +
336                 update_records_size(&new_lur->lur_update_rec);
337         memcpy(ptr, &record->ur_ops,
338                update_records_size(record) -
339                offsetof(struct update_records, ur_ops));
340
341         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
342         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
343         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
344
345         /* Replace the records */
346         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
347         dtrq->dtrq_lur = new_lur;
348         dtrq->dtrq_lur_size = lur_size;
349         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
350         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
351         RETURN(0);
352 }
353
354 /**
355  * Insert update records to the replay list.
356  *
357  * Allocate distribute txn replay req and insert it into the replay
358  * list, then insert the update records into the replay req.
359  *
360  * \param[in] tdtd      distribute txn replay data where the replay list
361  *                      is.
362  * \param[in] record    the update record
363  * \param[in] cookie    cookie of the record
364  * \param[in] index     mdt index of the record
365  *
366  * \retval              0 if the adding succeeds.
367  * \retval              negative errno if the adding fails.
368  */
369 int
370 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
371                                      struct llog_update_record *lur,
372                                      struct llog_cookie *cookie,
373                                      __u32 mdt_index)
374 {
375         struct distribute_txn_replay_req *dtrq;
376         struct update_records *record = &lur->lur_update_rec;
377         bool replace_record = false;
378         int rc = 0;
379         ENTRY;
380
381         CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
382                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
383                record->ur_batchid, record->ur_master_transno, mdt_index);
384
385         /* Update batchid if necessary */
386         spin_lock(&tdtd->tdtd_batchid_lock);
387         if (record->ur_batchid >= tdtd->tdtd_batchid) {
388                 CDEBUG(D_HA, "%s update batchid from "LPU64 " to "LPU64"\n",
389                        tdtd->tdtd_lut->lut_obd->obd_name,
390                        tdtd->tdtd_batchid, record->ur_batchid);
391                 tdtd->tdtd_batchid = record->ur_batchid + 1;
392         }
393         spin_unlock(&tdtd->tdtd_batchid_lock);
394
395 again:
396         spin_lock(&tdtd->tdtd_replay_list_lock);
397         /* First try to build the replay update request with the records */
398         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
399         if (dtrq == NULL) {
400                 spin_unlock(&tdtd->tdtd_replay_list_lock);
401                 dtrq = dtrq_create(tdtd, lur);
402                 if (IS_ERR(dtrq))
403                         RETURN(PTR_ERR(dtrq));
404
405                 spin_lock(&tdtd->tdtd_replay_list_lock);
406                 rc = dtrq_insert(tdtd, dtrq);
407                 if (rc < 0) {
408                         spin_unlock(&tdtd->tdtd_replay_list_lock);
409                         dtrq_destroy(dtrq);
410                         if (rc == -EEXIST)
411                                 goto again;
412                         return rc;
413                 }
414         } else {
415                 /* If the master transno in update header is not
416                 * matched with the one in the record, then it means
417                 * the dtrq is originally created by master record,
418                 * so we need update master transno and reposition
419                 * the dtrq(by master transno) in the list and also
420                 * replace update record */
421                 if (record->ur_master_transno != 0 &&
422                     dtrq->dtrq_master_transno != record->ur_master_transno &&
423                     dtrq->dtrq_lur != NULL) {
424                         list_del_init(&dtrq->dtrq_list);
425                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
426                                                 record->ur_master_transno;
427
428                         dtrq->dtrq_master_transno = record->ur_master_transno;
429                         replace_record = true;
430                         /* try to insert again */
431                         rc = dtrq_insert(tdtd, dtrq);
432                         if (rc < 0) {
433                                 spin_unlock(&tdtd->tdtd_replay_list_lock);
434                                 dtrq_destroy(dtrq);
435                                 return rc;
436                         }
437                 }
438         }
439         spin_unlock(&tdtd->tdtd_replay_list_lock);
440
441         /* Because there should be only thread access the update record, so
442          * we do not need lock here */
443         if (replace_record) {
444                 /* Replace the update record and master transno */
445                 OBD_FREE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
446                 dtrq->dtrq_lur = NULL;
447                 dtrq->dtrq_lur_size = llog_update_record_size(lur);
448                 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
449                 if (dtrq->dtrq_lur == NULL)
450                         return -ENOMEM;
451
452                 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
453         }
454
455         /* This is a partial update records, let's try to append
456          * the record to the current replay request */
457         if (record->ur_flags & UPDATE_RECORD_CONTINUE)
458                 rc = dtrq_append_updates(dtrq, record);
459
460         /* Then create and add sub update request */
461         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
462
463         RETURN(rc);
464 }
465 EXPORT_SYMBOL(insert_update_records_to_replay_list);
466
467 /**
468  * Dump updates of distribute txns.
469  *
470  * Output all of recovery updates in the distribute txn list to the
471  * debug log.
472  *
473  * \param[in] tdtd      distribute txn data where all of distribute txn
474  *                      are listed.
475  * \param[in] mask      debug mask
476  */
477 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
478 {
479         struct distribute_txn_replay_req *dtrq;
480
481         spin_lock(&tdtd->tdtd_replay_list_lock);
482         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
483                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
484                                     false);
485         spin_unlock(&tdtd->tdtd_replay_list_lock);
486 }
487 EXPORT_SYMBOL(dtrq_list_dump);
488
489 /**
490  * Destroy distribute txn replay req
491  *
492  * Destroy distribute txn replay req and all of subs.
493  *
494  * \param[in] dtrq      distribute txn replqy req to be destroyed.
495  */
496 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
497 {
498         struct distribute_txn_replay_req_sub    *dtrqs;
499         struct distribute_txn_replay_req_sub    *tmp;
500
501         LASSERT(list_empty(&dtrq->dtrq_list));
502         spin_lock(&dtrq->dtrq_sub_list_lock);
503         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
504                 struct sub_thandle_cookie *stc;
505                 struct sub_thandle_cookie *tmp;
506
507                 list_del(&dtrqs->dtrqs_list);
508                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
509                                          stc_list) {
510                         list_del(&stc->stc_list);
511                         OBD_FREE_PTR(stc);
512                 }
513                 OBD_FREE_PTR(dtrqs);
514         }
515         spin_unlock(&dtrq->dtrq_sub_list_lock);
516
517         if (dtrq->dtrq_lur != NULL)
518                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
519
520         OBD_FREE_PTR(dtrq);
521 }
522 EXPORT_SYMBOL(dtrq_destroy);
523
524 /**
525  * Destroy all of replay req.
526  *
527  * Destroy all of replay req in the replay list.
528  *
529  * \param[in] tdtd      target distribute txn data where the replay list is.
530  */
531 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
532 {
533         struct distribute_txn_replay_req *dtrq;
534         struct distribute_txn_replay_req *tmp;
535
536         spin_lock(&tdtd->tdtd_replay_list_lock);
537         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
538                                  dtrq_list) {
539                 list_del_init(&dtrq->dtrq_list);
540                 dtrq_destroy(dtrq);
541         }
542         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
543                                  dtrq_list) {
544                 list_del_init(&dtrq->dtrq_list);
545                 dtrq_destroy(dtrq);
546         }
547         spin_unlock(&tdtd->tdtd_replay_list_lock);
548 }
549 EXPORT_SYMBOL(dtrq_list_destroy);
550
551 /**
552  * Get next req in the replay list
553  *
554  * Get next req needs to be replayed, since it is a sorted list
555  * (by master MDT transno)
556  *
557  * \param[in] tdtd      distribute txn data where the replay list is
558  *
559  * \retval              the pointer of update recovery header
560  */
561 struct distribute_txn_replay_req *
562 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
563 {
564         struct distribute_txn_replay_req *dtrq = NULL;
565
566         spin_lock(&tdtd->tdtd_replay_list_lock);
567         if (!list_empty(&tdtd->tdtd_replay_list)) {
568                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
569                                  struct distribute_txn_replay_req, dtrq_list);
570                 list_del_init(&dtrq->dtrq_list);
571         }
572         spin_unlock(&tdtd->tdtd_replay_list_lock);
573
574         return dtrq;
575 }
576 EXPORT_SYMBOL(distribute_txn_get_next_req);
577
578 /**
579  * Get next transno in the replay list, because this is the sorted
580  * list, so it will return the transno of next req in the list.
581  *
582  * \param[in] tdtd      distribute txn data where the replay list is
583  *
584  * \retval              the transno of next update in the list
585  */
586 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
587 {
588         struct distribute_txn_replay_req        *dtrq = NULL;
589         __u64                                   transno = 0;
590
591         spin_lock(&tdtd->tdtd_replay_list_lock);
592         if (!list_empty(&tdtd->tdtd_replay_list)) {
593                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
594                                  struct distribute_txn_replay_req, dtrq_list);
595                 transno = dtrq->dtrq_master_transno;
596         }
597         spin_unlock(&tdtd->tdtd_replay_list_lock);
598
599         CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
600                tdtd->tdtd_lut->lut_obd->obd_name, transno);
601         return transno;
602 }
603 EXPORT_SYMBOL(distribute_txn_get_next_transno);
604
605 struct distribute_txn_replay_req *
606 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
607                                   __u64 xid)
608 {
609         struct distribute_txn_replay_req *dtrq = NULL;
610         struct distribute_txn_replay_req *iter;
611
612         spin_lock(&tdtd->tdtd_replay_list_lock);
613         list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
614                 if (iter->dtrq_xid == xid) {
615                         dtrq = iter;
616                         break;
617                 }
618         }
619         spin_unlock(&tdtd->tdtd_replay_list_lock);
620         return dtrq;
621 }
622
623 bool is_req_replayed_by_update(struct ptlrpc_request *req)
624 {
625         struct lu_target *tgt = class_exp2tgt(req->rq_export);
626         struct distribute_txn_replay_req *dtrq;
627
628         if (tgt->lut_tdtd == NULL)
629                 return false;
630
631         dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
632         if (dtrq == NULL)
633                 return false;
634
635         return true;
636 }
637 EXPORT_SYMBOL(is_req_replayed_by_update);
638
639 /**
640  * Check if the update of one object is committed
641  *
642  * Check whether the update for the object is committed by checking whether
643  * the correspondent sub exists in the replay req. If it is committed, mark
644  * the committed flag in correspondent the sub thandle.
645  *
646  * \param[in] env       execution environment
647  * \param[in] dtrq      replay request
648  * \param[in] dt_obj    object for the update
649  * \param[in] top_th    top thandle
650  * \param[in] sub_th    sub thandle which the update belongs to
651  *
652  * \retval              1 if the update is not committed.
653  * \retval              0 if the update is committed.
654  * \retval              negative errno if some other failures happen.
655  */
656 static int update_is_committed(const struct lu_env *env,
657                                struct distribute_txn_replay_req *dtrq,
658                                struct dt_object *dt_obj,
659                                struct top_thandle *top_th,
660                                struct sub_thandle *st)
661 {
662         struct seq_server_site  *seq_site;
663         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
664         struct distribute_txn_replay_req_sub    *dtrqs;
665         __u32                   mdt_index;
666         ENTRY;
667
668         if (st->st_sub_th != NULL)
669                 RETURN(1);
670
671         if (st->st_committed)
672                 RETURN(0);
673
674         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
675         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
676                 mdt_index = fid_oid(fid);
677         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
678                 mdt_index = seq_site->ss_node_id;
679         } else {
680                 struct lu_server_fld *fld;
681                 struct lu_seq_range range = {0};
682                 int rc;
683
684                 fld = seq_site->ss_server_fld;
685                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
686                 LASSERT(fld->lsf_seq_lookup != NULL);
687                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
688                                          &range);
689                 if (rc < 0)
690                         RETURN(rc);
691                 mdt_index = range.lsr_index;
692         }
693
694         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
695         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
696                 st->st_committed = 1;
697                 if (dtrqs != NULL) {
698                         struct sub_thandle_cookie *stc;
699                         struct sub_thandle_cookie *tmp;
700
701                         list_for_each_entry_safe(stc, tmp,
702                                                  &dtrqs->dtrqs_cookie_list,
703                                                  stc_list)
704                                 list_move(&stc->stc_list, &st->st_cookie_list);
705                 }
706                 RETURN(0);
707         }
708
709         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
710                mdt_index);
711
712         RETURN(1);
713 }
714
715 /**
716  * Implementation of different update methods for update recovery.
717  *
718  * These following functions update_recovery_$(update_name) implement
719  * different updates recovery methods. They will extract the parameters
720  * from the common parameters area and call correspondent dt API to redo
721  * the update.
722  *
723  * \param[in] env       execution environment
724  * \param[in] op        update operation to be replayed
725  * \param[in] params    common update parameters which holds all parameters
726  *                      of the operation
727  * \param[in] th        transaction handle
728  * \param[in] declare   indicate it will do declare or real execution, true
729  *                      means declare, false means real execution
730  *
731  * \retval              0 if it succeeds.
732  * \retval              negative errno if it fails.
733  */
734 static int update_recovery_create(const struct lu_env *env,
735                                   struct dt_object *dt_obj,
736                                   const struct update_op *op,
737                                   const struct update_params *params,
738                                   struct thandle_exec_args *ta,
739                                   struct thandle *th)
740 {
741         struct update_thread_info *uti = update_env_info(env);
742         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
743         struct lu_attr          *attr = &uti->uti_attr;
744         struct obdo             *wobdo;
745         struct obdo             *lobdo = &uti->uti_obdo;
746         struct dt_object_format dof;
747         __u16                   size;
748         unsigned int            param_count;
749         int rc;
750         ENTRY;
751
752         if (dt_object_exists(dt_obj))
753                 RETURN(-EEXIST);
754
755         param_count = lur->lur_update_rec.ur_param_count;
756         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
757                                             param_count, &size);
758         if (wobdo == NULL)
759                 RETURN(-EIO);
760         if (size != sizeof(*wobdo))
761                 RETURN(-EIO);
762
763         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
764                 lustre_swab_obdo(wobdo);
765
766         lustre_get_wire_obdo(NULL, lobdo, wobdo);
767         la_from_obdo(attr, lobdo, lobdo->o_valid);
768
769         dof.dof_type = dt_mode_to_dft(attr->la_mode);
770
771         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
772                            ta, th, NULL, 0);
773
774         RETURN(rc);
775 }
776
777 static int update_recovery_destroy(const struct lu_env *env,
778                                    struct dt_object *dt_obj,
779                                    const struct update_op *op,
780                                    const struct update_params *params,
781                                    struct thandle_exec_args *ta,
782                                    struct thandle *th)
783 {
784         int rc;
785         ENTRY;
786
787         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
788
789         RETURN(rc);
790 }
791
792 static int update_recovery_ref_add(const struct lu_env *env,
793                                    struct dt_object *dt_obj,
794                                    const struct update_op *op,
795                                    const struct update_params *params,
796                                    struct thandle_exec_args *ta,
797                                    struct thandle *th)
798 {
799         int rc;
800         ENTRY;
801
802         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
803
804         RETURN(rc);
805 }
806
807 static int update_recovery_ref_del(const struct lu_env *env,
808                                    struct dt_object *dt_obj,
809                                    const struct update_op *op,
810                                    const struct update_params *params,
811                                    struct thandle_exec_args *ta,
812                                    struct thandle *th)
813 {
814         int rc;
815         ENTRY;
816
817         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
818
819         RETURN(rc);
820 }
821
822 static int update_recovery_attr_set(const struct lu_env *env,
823                                     struct dt_object *dt_obj,
824                                     const struct update_op *op,
825                                     const struct update_params *params,
826                                     struct thandle_exec_args *ta,
827                                     struct thandle *th)
828 {
829         struct update_thread_info *uti = update_env_info(env);
830         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
831         struct obdo     *wobdo;
832         struct obdo     *lobdo = &uti->uti_obdo;
833         struct lu_attr  *attr = &uti->uti_attr;
834         __u16           size;
835         unsigned int    param_count;
836         int             rc;
837         ENTRY;
838
839         param_count = lur->lur_update_rec.ur_param_count;
840         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
841                                             param_count, &size);
842         if (wobdo == NULL)
843                 RETURN(-EIO);
844         if (size != sizeof(*wobdo))
845                 RETURN(-EIO);
846
847         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
848                 lustre_swab_obdo(wobdo);
849
850         lustre_get_wire_obdo(NULL, lobdo, wobdo);
851         la_from_obdo(attr, lobdo, lobdo->o_valid);
852
853         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
854
855         RETURN(rc);
856 }
857
858 static int update_recovery_xattr_set(const struct lu_env *env,
859                                      struct dt_object *dt_obj,
860                                      const struct update_op *op,
861                                      const struct update_params *params,
862                                      struct thandle_exec_args *ta,
863                                      struct thandle *th)
864 {
865         struct update_thread_info *uti = update_env_info(env);
866         char            *buf;
867         char            *name;
868         int             fl;
869         __u16           size;
870         __u32           param_count;
871         int             rc;
872         ENTRY;
873
874         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
875         name = update_params_get_param_buf(params,
876                                            op->uop_params_off[0],
877                                            param_count, &size);
878         if (name == NULL)
879                 RETURN(-EIO);
880
881         buf = update_params_get_param_buf(params,
882                                           op->uop_params_off[1],
883                                           param_count, &size);
884         if (buf == NULL)
885                 RETURN(-EIO);
886
887         uti->uti_buf.lb_buf = buf;
888         uti->uti_buf.lb_len = (size_t)size;
889
890         buf = update_params_get_param_buf(params, op->uop_params_off[2],
891                                           param_count, &size);
892         if (buf == NULL)
893                 RETURN(-EIO);
894         if (size != sizeof(fl))
895                 RETURN(-EIO);
896
897         fl = le32_to_cpu(*(int *)buf);
898
899         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
900                               NULL, 0);
901
902         RETURN(rc);
903 }
904
905 static int update_recovery_index_insert(const struct lu_env *env,
906                                         struct dt_object *dt_obj,
907                                         const struct update_op *op,
908                                         const struct update_params *params,
909                                         struct thandle_exec_args *ta,
910                                         struct thandle *th)
911 {
912         struct update_thread_info *uti = update_env_info(env);
913         struct lu_fid           *fid;
914         char                    *name;
915         __u32                   param_count;
916         __u32                   *ptype;
917         __u32                   type;
918         __u16                   size;
919         int rc;
920         ENTRY;
921
922         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
923         name = update_params_get_param_buf(params, op->uop_params_off[0],
924                                            param_count, &size);
925         if (name == NULL)
926                 RETURN(-EIO);
927
928         fid = update_params_get_param_buf(params, op->uop_params_off[1],
929                                           param_count, &size);
930         if (fid == NULL)
931                 RETURN(-EIO);
932         if (size != sizeof(*fid))
933                 RETURN(-EIO);
934
935         fid_le_to_cpu(fid, fid);
936
937         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
938                                             param_count, &size);
939         if (ptype == NULL)
940                 RETURN(-EIO);
941         if (size != sizeof(*ptype))
942                 RETURN(-EIO);
943         type = le32_to_cpu(*ptype);
944
945         if (dt_try_as_dir(env, dt_obj) == 0)
946                 RETURN(-ENOTDIR);
947
948         uti->uti_rec.rec_fid = fid;
949         uti->uti_rec.rec_type = type;
950
951         rc = out_tx_index_insert(env, dt_obj,
952                                  (const struct dt_rec *)&uti->uti_rec,
953                                  (const struct dt_key *)name, ta, th,
954                                  NULL, 0);
955
956         RETURN(rc);
957 }
958
959 static int update_recovery_index_delete(const struct lu_env *env,
960                                         struct dt_object *dt_obj,
961                                         const struct update_op *op,
962                                         const struct update_params *params,
963                                         struct thandle_exec_args *ta,
964                                         struct thandle *th)
965 {
966         struct update_thread_info *uti = update_env_info(env);
967         __u32   param_count;
968         char    *name;
969         __u16   size;
970         int     rc;
971         ENTRY;
972
973         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
974         name = update_params_get_param_buf(params, op->uop_params_off[0],
975                                            param_count, &size);
976         if (name == NULL)
977                 RETURN(-EIO);
978
979         if (dt_try_as_dir(env, dt_obj) == 0)
980                 RETURN(-ENOTDIR);
981
982         rc = out_tx_index_delete(env, dt_obj,
983                                  (const struct dt_key *)name, ta, th, NULL, 0);
984
985         RETURN(rc);
986 }
987
988 static int update_recovery_write(const struct lu_env *env,
989                                  struct dt_object *dt_obj,
990                                  const struct update_op *op,
991                                  const struct update_params *params,
992                                  struct thandle_exec_args *ta,
993                                  struct thandle *th)
994 {
995         struct update_thread_info *uti = update_env_info(env);
996         char            *buf;
997         __u32           param_count;
998         __u64           pos;
999         __u16           size;
1000         int rc;
1001         ENTRY;
1002
1003         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1004         buf = update_params_get_param_buf(params, op->uop_params_off[0],
1005                                           param_count, &size);
1006         if (buf == NULL)
1007                 RETURN(-EIO);
1008
1009         uti->uti_buf.lb_buf = buf;
1010         uti->uti_buf.lb_len = size;
1011
1012         buf = update_params_get_param_buf(params, op->uop_params_off[1],
1013                                           param_count, &size);
1014         if (buf == NULL)
1015                 RETURN(-EIO);
1016
1017         pos = le64_to_cpu(*(__u64 *)buf);
1018
1019         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1020                           ta, th, NULL, 0);
1021
1022         RETURN(rc);
1023 }
1024
1025 static int update_recovery_xattr_del(const struct lu_env *env,
1026                                      struct dt_object *dt_obj,
1027                                      const struct update_op *op,
1028                                      const struct update_params *params,
1029                                      struct thandle_exec_args *ta,
1030                                      struct thandle *th)
1031 {
1032         struct update_thread_info *uti = update_env_info(env);
1033         __u32   param_count;
1034         char    *name;
1035         __u16   size;
1036         int     rc;
1037         ENTRY;
1038
1039         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1040         name = update_params_get_param_buf(params, op->uop_params_off[0],
1041                                            param_count, &size);
1042         if (name == NULL)
1043                 RETURN(-EIO);
1044
1045         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1046
1047         RETURN(rc);
1048 }
1049
1050 /**
1051  * Update session information
1052  *
1053  * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1054  * can be called correctly during update replay.
1055  *
1056  * \param[in] env       execution environment.
1057  * \param[in] tdtd      distribute data structure of the recovering tgt.
1058  * \param[in] th        thandle of this update replay.
1059  * \param[in] master_th master sub thandle.
1060  * \param[in] ta_arg    the tx arg structure to hold the update for updating
1061  *                      reply data.
1062  */
1063 static void update_recovery_update_ses(struct lu_env *env,
1064                                       struct target_distribute_txn_data *tdtd,
1065                                       struct thandle *th,
1066                                       struct thandle *master_th,
1067                                       struct distribute_txn_replay_req *dtrq,
1068                                       struct tx_arg *ta_arg)
1069 {
1070         struct tgt_session_info *tsi;
1071         struct lu_target        *lut = tdtd->tdtd_lut;
1072         struct obd_export       *export;
1073         struct cfs_hash         *hash;
1074         struct top_thandle      *top_th;
1075         struct lsd_reply_data   *lrd;
1076         size_t                  size;
1077
1078         tsi = tgt_ses_info(env);
1079         if (tsi->tsi_exp != NULL)
1080                 return;
1081
1082         size = ta_arg->u.write.buf.lb_len;
1083         lrd = ta_arg->u.write.buf.lb_buf;
1084         if (size != sizeof(*lrd) || lrd == NULL)
1085                 return;
1086
1087         lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
1088         lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
1089         lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
1090         lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
1091         lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
1092
1093         if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1094                 return;
1095
1096         hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1097         if (hash == NULL)
1098                 return;
1099
1100         export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1101         if (export == NULL) {
1102                 cfs_hash_putref(hash);
1103                 return;
1104         }
1105
1106         tsi->tsi_exp = export;
1107         tsi->tsi_xid = lrd->lrd_xid;
1108         tsi->tsi_opdata = lrd->lrd_data;
1109         tsi->tsi_result = lrd->lrd_result;
1110         tsi->tsi_client_gen = lrd->lrd_client_gen;
1111         dtrq->dtrq_xid = lrd->lrd_xid;
1112         top_th = container_of(th, struct top_thandle, tt_super);
1113         top_th->tt_master_sub_thandle = master_th;
1114         cfs_hash_putref(hash);
1115 }
1116
1117 /**
1118  * Execute updates in the update replay records
1119  *
1120  * Declare distribute txn replay by update records and add the updates
1121  * to the execution list. Note: it will check if the update has been
1122  * committed, and only execute the updates if it is not committed to
1123  * disk.
1124  *
1125  * \param[in] env       execution environment
1126  * \param[in] tdtd      distribute txn replay data which hold all of replay
1127  *                      reqs and all replay parameters.
1128  * \param[in] dtrq      distribute transaction replay req.
1129  * \param[in] ta        thandle execute args.
1130  *
1131  * \retval              0 if declare succeeds.
1132  * \retval              negative errno if declare fails.
1133  */
1134 static int update_recovery_exec(const struct lu_env *env,
1135                                 struct target_distribute_txn_data *tdtd,
1136                                 struct distribute_txn_replay_req *dtrq,
1137                                 struct thandle_exec_args *ta)
1138 {
1139         struct llog_update_record *lur = dtrq->dtrq_lur;
1140         struct update_records   *records = &lur->lur_update_rec;
1141         struct update_ops       *ops = &records->ur_ops;
1142         struct update_params    *params = update_records_get_params(records);
1143         struct top_thandle      *top_th = container_of(ta->ta_handle,
1144                                                        struct top_thandle,
1145                                                        tt_super);
1146         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1147         struct update_op        *op;
1148         unsigned int            i;
1149         int                     rc = 0;
1150         ENTRY;
1151
1152         /* These records have been swabbed in llog_cat_process() */
1153         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1154              i++, op = update_op_next_op(op)) {
1155                 struct lu_fid           *fid = &op->uop_fid;
1156                 struct dt_object        *dt_obj;
1157                 struct dt_object        *sub_dt_obj;
1158                 struct dt_device        *sub_dt;
1159                 struct sub_thandle      *st;
1160
1161                 if (op->uop_type == OUT_NOOP)
1162                         continue;
1163
1164                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1165                 if (IS_ERR(dt_obj)) {
1166                         rc = PTR_ERR(dt_obj);
1167                         break;
1168                 }
1169                 sub_dt_obj = dt_object_child(dt_obj);
1170
1171                 /* Create sub thandle if not */
1172                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1173                 st = lookup_sub_thandle(tmt, sub_dt);
1174                 if (st == NULL) {
1175                         st = create_sub_thandle(tmt, sub_dt);
1176                         if (IS_ERR(st))
1177                                 GOTO(next, rc = PTR_ERR(st));
1178                 }
1179
1180                 /* check if updates on the OSD/OSP are committed */
1181                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1182                 if (rc == 0)
1183                         /* If this is committed, goto next */
1184                         goto next;
1185
1186                 if (rc < 0)
1187                         GOTO(next, rc);
1188
1189                 /* Create thandle for sub thandle if needed */
1190                 if (st->st_sub_th == NULL) {
1191                         rc = sub_thandle_trans_create(env, top_th, st);
1192                         if (rc != 0)
1193                                 GOTO(next, rc);
1194                 }
1195
1196                 CDEBUG(D_HA, "replay %uth update\n", i);
1197                 switch (op->uop_type) {
1198                 case OUT_CREATE:
1199                         rc = update_recovery_create(env, sub_dt_obj,
1200                                                     op, params, ta,
1201                                                     st->st_sub_th);
1202                         break;
1203                 case OUT_DESTROY:
1204                         rc = update_recovery_destroy(env, sub_dt_obj,
1205                                                      op, params, ta,
1206                                                      st->st_sub_th);
1207                         break;
1208                 case OUT_REF_ADD:
1209                         rc = update_recovery_ref_add(env, sub_dt_obj,
1210                                                      op, params, ta,
1211                                                      st->st_sub_th);
1212                         break;
1213                 case OUT_REF_DEL:
1214                         rc = update_recovery_ref_del(env, sub_dt_obj,
1215                                                      op, params, ta,
1216                                                      st->st_sub_th);
1217                         break;
1218                 case OUT_ATTR_SET:
1219                         rc = update_recovery_attr_set(env, sub_dt_obj,
1220                                                       op, params, ta,
1221                                                       st->st_sub_th);
1222                         break;
1223                 case OUT_XATTR_SET:
1224                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1225                                                        op, params, ta,
1226                                                        st->st_sub_th);
1227                         break;
1228                 case OUT_INDEX_INSERT:
1229                         rc = update_recovery_index_insert(env, sub_dt_obj,
1230                                                           op, params, ta,
1231                                                           st->st_sub_th);
1232                         break;
1233                 case OUT_INDEX_DELETE:
1234                         rc = update_recovery_index_delete(env, sub_dt_obj,
1235                                                           op, params, ta,
1236                                                           st->st_sub_th);
1237                         break;
1238                 case OUT_WRITE:
1239                         rc = update_recovery_write(env, sub_dt_obj,
1240                                                    op, params, ta,
1241                                                    st->st_sub_th);
1242                         break;
1243                 case OUT_XATTR_DEL:
1244                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1245                                                        op, params, ta,
1246                                                        st->st_sub_th);
1247                         break;
1248                 default:
1249                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1250                         rc = -EINVAL;
1251                         break;
1252                 }
1253 next:
1254                 lu_object_put(env, &dt_obj->do_lu);
1255                 if (rc < 0)
1256                         break;
1257         }
1258
1259         ta->ta_handle->th_result = rc;
1260         RETURN(rc);
1261 }
1262
1263 /**
1264  * redo updates on MDT if needed.
1265  *
1266  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1267  * this function to replay distribute txn updates on all MDTs. It only replay
1268  * updates on the MDT where the update record is missing.
1269  *
1270  * If the update already exists on the MDT, then it does not need replay the
1271  * updates on that MDT, and only mark the sub transaction has been committed
1272  * there.
1273  *
1274  * \param[in] env       execution environment
1275  * \param[in] tdtd      target distribute txn data, which holds the replay list
1276  *                      and all parameters needed by replay process.
1277  * \param[in] dtrq      distribute txn replay req.
1278  *
1279  * \retval              0 if replay succeeds.
1280  * \retval              negative errno if replay failes.
1281  */
1282 int distribute_txn_replay_handle(struct lu_env *env,
1283                                  struct target_distribute_txn_data *tdtd,
1284                                  struct distribute_txn_replay_req *dtrq)
1285 {
1286         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1287         struct thandle_exec_args *ta;
1288         struct lu_context       session_env;
1289         struct thandle          *th = NULL;
1290         struct top_thandle      *top_th;
1291         struct top_multiple_thandle *tmt;
1292         struct thandle_update_records *tur = NULL;
1293         int                     i;
1294         int                     rc = 0;
1295         ENTRY;
1296
1297         /* initialize session, it is needed for the handler of target */
1298         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1299         if (rc) {
1300                 CERROR("%s: failure to initialize session: rc = %d\n",
1301                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1302                 RETURN(rc);
1303         }
1304         lu_context_enter(&session_env);
1305         env->le_ses = &session_env;
1306         lu_env_refill(env);
1307         update_records_dump(records, D_HA, true);
1308         th = top_trans_create(env, NULL);
1309         if (IS_ERR(th))
1310                 GOTO(exit_session, rc = PTR_ERR(th));
1311
1312         ta = &update_env_info(env)->uti_tea;
1313         ta->ta_argno = 0;
1314
1315         update_env_info(env)->uti_dtrq = dtrq;
1316         /* Create distribute transaction structure for this top thandle */
1317         top_th = container_of(th, struct top_thandle, tt_super);
1318         rc = top_trans_create_tmt(env, top_th);
1319         if (rc < 0)
1320                 GOTO(stop_trans, rc);
1321
1322         th->th_dev = tdtd->tdtd_dt;
1323         ta->ta_handle = th;
1324
1325         /* check if the distribute transaction has been committed */
1326         tmt = top_th->tt_multiple_thandle;
1327         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1328         tmt->tmt_batchid = dtrq->dtrq_batchid;
1329         tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1330
1331         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1332                 tmt->tmt_committed = 1;
1333
1334         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1335         if (rc < 0)
1336                 GOTO(stop_trans, rc);
1337
1338         /* If no updates are needed to be replayed, then mark this records as
1339          * committed, so commit thread distribute_txn_commit_thread() will
1340          * delete the record */
1341         if (ta->ta_argno == 0)
1342                 tmt->tmt_committed = 1;
1343
1344         tur = &update_env_info(env)->uti_tur;
1345         tur->tur_update_records = dtrq->dtrq_lur;
1346         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1347         tur->tur_update_params = NULL;
1348         tur->tur_update_param_count = 0;
1349         tmt->tmt_update_records = tur;
1350
1351         distribute_txn_insert_by_batchid(tmt);
1352         rc = top_trans_start(env, NULL, th);
1353         if (rc < 0)
1354                 GOTO(stop_trans, rc);
1355
1356         for (i = 0; i < ta->ta_argno; i++) {
1357                 struct tx_arg           *ta_arg;
1358                 struct dt_object        *dt_obj;
1359                 struct dt_device        *sub_dt;
1360                 struct sub_thandle      *st;
1361
1362                 ta_arg = ta->ta_args[i];
1363                 dt_obj = ta_arg->object;
1364
1365                 LASSERT(tmt->tmt_committed == 0);
1366                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1367                 st = lookup_sub_thandle(tmt, sub_dt);
1368
1369                 LASSERT(st != NULL);
1370                 LASSERT(st->st_sub_th != NULL);
1371                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1372                                              ta->ta_args[i]);
1373
1374                 /* If the update is to update the reply data, then
1375                  * we need set the session information, so
1376                  * tgt_last_rcvd_update() can be called correctly */
1377                 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1378                         update_recovery_update_ses(env, tdtd, th,
1379                                                    st->st_sub_th, dtrq, ta_arg);
1380
1381                 if (unlikely(rc < 0)) {
1382                         CDEBUG(D_HA, "error during execution of #%u from"
1383                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1384                                ta->ta_args[i]->line, rc);
1385                         while (--i > 0) {
1386                                 if (ta->ta_args[i]->undo_fn != NULL) {
1387                                         dt_obj = ta->ta_args[i]->object;
1388                                         sub_dt =
1389                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1390                                         st = lookup_sub_thandle(tmt, sub_dt);
1391                                         LASSERT(st != NULL);
1392                                         LASSERT(st->st_sub_th != NULL);
1393
1394                                         ta->ta_args[i]->undo_fn(env,
1395                                                                st->st_sub_th,
1396                                                                ta->ta_args[i]);
1397                                 } else {
1398                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1399                                              dt_obd_name(ta->ta_handle->th_dev),
1400                                                ta->ta_args[i]->file,
1401                                                ta->ta_args[i]->line, -ENOTSUPP);
1402                                 }
1403                         }
1404                         break;
1405                 }
1406                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1407                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1408         }
1409
1410 stop_trans:
1411         if (rc < 0)
1412                 th->th_result = rc;
1413         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1414         for (i = 0; i < ta->ta_argno; i++) {
1415                 if (ta->ta_args[i]->object != NULL) {
1416                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1417                         ta->ta_args[i]->object = NULL;
1418                 }
1419         }
1420
1421         if (tur != NULL)
1422                 tur->tur_update_records = NULL;
1423
1424         if (tgt_ses_info(env)->tsi_exp != NULL) {
1425                 class_export_put(tgt_ses_info(env)->tsi_exp);
1426                 tgt_ses_info(env)->tsi_exp = NULL;
1427         }
1428 exit_session:
1429         lu_context_exit(&session_env);
1430         lu_context_fini(&session_env);
1431         RETURN(rc);
1432 }
1433 EXPORT_SYMBOL(distribute_txn_replay_handle);