Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2015, 2017, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <lustre_obdo.h>
48 #include <lustre_update.h>
49 #include <lustre_swab.h>
50 #include <md_object.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "tgt_internal.h"
55
56 /**
57  * Lookup distribute_txn_replay req
58  *
59  * Lookup distribute_txn_replay in the replay list by batchid.
60  * It is assumed the list has been locked before calling this function.
61  *
62  * \param[in] tdtd      distribute_txn_data, which holds the replay
63  *                      list.
64  * \param[in] batchid   batchid used by lookup.
65  *
66  * \retval              pointer of the replay if succeeds.
67  * \retval              NULL if can not find it.
68  */
69 static struct distribute_txn_replay_req *
70 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
71 {
72         struct distribute_txn_replay_req        *tmp;
73         struct distribute_txn_replay_req        *dtrq = NULL;
74
75         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
76                 if (tmp->dtrq_batchid == batchid) {
77                         dtrq = tmp;
78                         break;
79                 }
80         }
81         return dtrq;
82 }
83
84 /**
85  * insert distribute txn replay req
86  *
87  * Insert distribute txn replay to the replay list, and it assumes the
88  * list has been looked. Note: the replay list is a sorted list, which
89  * is sorted by master transno. It is assumed the replay list has been
90  * locked before calling this function.
91  *
92  * \param[in] tdtd      target distribute txn data where replay list is
93  * \param[in] new       distribute txn replay to be inserted
94  *
95  * \retval              0 if insertion succeeds
96  * \retval              EEXIST if the dtrq already exists
97  */
98 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
99                         struct distribute_txn_replay_req *new)
100 {
101         struct distribute_txn_replay_req *iter;
102
103         /* Check if the dtrq has been added to the list */
104         iter = dtrq_lookup(tdtd, new->dtrq_batchid);
105         if (iter != NULL)
106                 return -EEXIST;
107
108         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
109                 if (iter->dtrq_master_transno > new->dtrq_master_transno)
110                         continue;
111
112                 /* If there are mulitple replay req with same transno, then
113                  * sort them with batchid */
114                 if (iter->dtrq_master_transno == new->dtrq_master_transno &&
115                     iter->dtrq_batchid > new->dtrq_batchid)
116                         continue;
117
118                 list_add(&new->dtrq_list, &iter->dtrq_list);
119                 break;
120         }
121
122         if (list_empty(&new->dtrq_list))
123                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
124
125         return 0;
126 }
127
128 /**
129  * create distribute txn replay req
130  *
131  * Allocate distribute txn replay req according to the update records.
132  *
133  * \param[in] tdtd      target distribute txn data where replay list is.
134  * \param[in] record    update records from the update log.
135  *
136  * \retval              the pointer of distribute txn replay req if
137  *                      the creation succeeds.
138  * \retval              NULL if the creation fails.
139  */
140 static struct distribute_txn_replay_req *
141 dtrq_create(struct target_distribute_txn_data *tdtd,
142             struct llog_update_record *lur)
143 {
144         struct distribute_txn_replay_req *new;
145
146         OBD_ALLOC_PTR(new);
147         if (new == NULL)
148                 RETURN(ERR_PTR(-ENOMEM));
149
150         new->dtrq_lur_size = llog_update_record_size(lur);
151         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
152         if (new->dtrq_lur == NULL) {
153                 OBD_FREE_PTR(new);
154                 RETURN(ERR_PTR(-ENOMEM));
155         }
156
157         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
158
159         /* If the transno in the update record is 0, it means the
160          * update are from master MDT, and it will use the master
161          * last committed transno as its master transno. Later, if
162          * the update records are gotten from slave MDTs, then these
163          * transno will be replaced.
164          * See insert_update_records_to_replay_list(). */
165         if (lur->lur_update_rec.ur_master_transno == 0) {
166                 new->dtrq_lur->lur_update_rec.ur_master_transno =
167                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
168                 new->dtrq_master_transno =
169                                 tdtd->tdtd_lut->lut_obd->obd_last_committed;
170         } else {
171                 new->dtrq_master_transno =
172                                 lur->lur_update_rec.ur_master_transno;
173         }
174
175         new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
176
177         spin_lock_init(&new->dtrq_sub_list_lock);
178         INIT_LIST_HEAD(&new->dtrq_sub_list);
179         INIT_LIST_HEAD(&new->dtrq_list);
180
181         RETURN(new);
182 }
183
184 /**
185  * Lookup distribute sub replay
186  *
187  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
188  * mdt_index.
189  *
190  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
191  * \param[in] mdt_index                 the mdt_index as the key of lookup
192  *
193  * \retval              the pointer of sub replay if it can be found.
194  * \retval              NULL if it can not find.
195  */
196 struct distribute_txn_replay_req_sub *
197 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
198 {
199         struct distribute_txn_replay_req_sub *dtrqs = NULL;
200         struct distribute_txn_replay_req_sub *tmp;
201
202         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
203                 if (tmp->dtrqs_mdt_index == mdt_index) {
204                         dtrqs = tmp;
205                         break;
206                 }
207         }
208         return dtrqs;
209 }
210
211 /**
212  * Try to add cookie to sub distribute txn request
213  *
214  * Check if the update log cookie has been added to the request, if not,
215  * add it to the dtrqs_cookie_list.
216  *
217  * \param[in] dtrqs     sub replay req where cookies to be added.
218  * \param[in] cookie    cookie to be added.
219  *
220  * \retval              0 if the cookie is adding succeeds.
221  * \retval              negative errno if adding fails.
222  */
223 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
224                                struct llog_cookie *cookie)
225 {
226         struct sub_thandle_cookie *new;
227
228         OBD_ALLOC_PTR(new);
229         if (new == NULL)
230                 return -ENOMEM;
231
232         INIT_LIST_HEAD(&new->stc_list);
233         new->stc_cookie = *cookie;
234         /* Note: only single thread will access one sub_request each time,
235          * so no need lock here */
236         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
237
238         return 0;
239 }
240
241 /**
242  * Insert distribute txn sub req replay
243  *
244  * Allocate sub replay req and insert distribute txn replay list.
245  *
246  * \param[in] dtrq      d to be added
247  * \param[in] cookie    the cookie of the update record
248  * \param[in] mdt_index the mdt_index of the update record
249  *
250  * \retval              0 if the adding succeeds.
251  * \retval              negative errno if the adding fails.
252  */
253 static int
254 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
255                            struct llog_cookie *cookie,
256                            __u32 mdt_index)
257 {
258         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
259         struct distribute_txn_replay_req_sub    *new;
260         int                                     rc;
261         ENTRY;
262
263         spin_lock(&dtrq->dtrq_sub_list_lock);
264         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
265         spin_unlock(&dtrq->dtrq_sub_list_lock);
266         if (dtrqs != NULL) {
267                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
268                 RETURN(0);
269         }
270
271         OBD_ALLOC_PTR(new);
272         if (new == NULL)
273                 RETURN(-ENOMEM);
274
275         INIT_LIST_HEAD(&new->dtrqs_list);
276         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
277         new->dtrqs_mdt_index = mdt_index;
278         spin_lock(&dtrq->dtrq_sub_list_lock);
279         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
280         if (dtrqs == NULL) {
281                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
282                 dtrqs = new;
283         } else {
284                 OBD_FREE_PTR(new);
285         }
286         spin_unlock(&dtrq->dtrq_sub_list_lock);
287
288         rc = dtrq_sub_add_cookie(dtrqs, cookie);
289
290         RETURN(rc);
291 }
292
293 /**
294  * append updates to the current replay updates
295  *
296  * Append more updates to the existent replay update. And this is only
297  * used when combining mulitple updates into one large updates during
298  * replay.
299  *
300  * \param[in] dtrq      the update replay request where the new update
301  *                      records will be added.
302  * \param[in] lur       the new update record.
303  *
304  * \retval              0 if appending succeeds.
305  * \retval              negative errno if appending fails.
306  */
307 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
308                                struct update_records *record)
309 {
310         struct llog_update_record *new_lur;
311         size_t lur_size = dtrq->dtrq_lur_size;
312         void *ptr;
313         ENTRY;
314
315         /* Because several threads might retrieve the same records from
316          * different targets, and we only need one copy of records. So
317          * we will check if the records is in the next one, if not, just
318          * skip it */
319         spin_lock(&dtrq->dtrq_sub_list_lock);
320         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
321                 spin_unlock(&dtrq->dtrq_sub_list_lock);
322                 RETURN(0);
323         }
324         dtrq->dtrq_lur->lur_update_rec.ur_index++;
325         spin_unlock(&dtrq->dtrq_sub_list_lock);
326
327         lur_size += update_records_size(record);
328         OBD_ALLOC_LARGE(new_lur, lur_size);
329         if (new_lur == NULL) {
330                 spin_lock(&dtrq->dtrq_sub_list_lock);
331                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
332                 spin_unlock(&dtrq->dtrq_sub_list_lock);
333                 RETURN(-ENOMEM);
334         }
335
336         /* Copy the old and new records to the new allocated buffer */
337         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
338         ptr = (char *)&new_lur->lur_update_rec +
339                 update_records_size(&new_lur->lur_update_rec);
340         memcpy(ptr, &record->ur_ops,
341                update_records_size(record) -
342                offsetof(struct update_records, ur_ops));
343
344         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
345         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
346         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
347
348         /* Replace the records */
349         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
350         dtrq->dtrq_lur = new_lur;
351         dtrq->dtrq_lur_size = lur_size;
352         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
353         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
354         RETURN(0);
355 }
356
357 /**
358  * Insert update records to the replay list.
359  *
360  * Allocate distribute txn replay req and insert it into the replay
361  * list, then insert the update records into the replay req.
362  *
363  * \param[in] tdtd      distribute txn replay data where the replay list
364  *                      is.
365  * \param[in] record    the update record
366  * \param[in] cookie    cookie of the record
367  * \param[in] index     mdt index of the record
368  *
369  * \retval              0 if the adding succeeds.
370  * \retval              negative errno if the adding fails.
371  */
372 int
373 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
374                                      struct llog_update_record *lur,
375                                      struct llog_cookie *cookie,
376                                      __u32 mdt_index)
377 {
378         struct distribute_txn_replay_req *dtrq;
379         struct update_records *record = &lur->lur_update_rec;
380         bool replace_record = false;
381         int rc = 0;
382         ENTRY;
383
384         CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
385                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
386                record->ur_batchid, record->ur_master_transno, mdt_index);
387
388         /* Update batchid if necessary */
389         spin_lock(&tdtd->tdtd_batchid_lock);
390         if (record->ur_batchid >= tdtd->tdtd_batchid) {
391                 CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
392                        tdtd->tdtd_lut->lut_obd->obd_name,
393                        tdtd->tdtd_batchid, record->ur_batchid);
394                 tdtd->tdtd_batchid = record->ur_batchid + 1;
395         }
396         spin_unlock(&tdtd->tdtd_batchid_lock);
397
398 again:
399         spin_lock(&tdtd->tdtd_replay_list_lock);
400         /* First try to build the replay update request with the records */
401         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
402         if (dtrq == NULL) {
403                 spin_unlock(&tdtd->tdtd_replay_list_lock);
404                 dtrq = dtrq_create(tdtd, lur);
405                 if (IS_ERR(dtrq))
406                         RETURN(PTR_ERR(dtrq));
407
408                 spin_lock(&tdtd->tdtd_replay_list_lock);
409                 rc = dtrq_insert(tdtd, dtrq);
410                 if (rc < 0) {
411                         spin_unlock(&tdtd->tdtd_replay_list_lock);
412                         dtrq_destroy(dtrq);
413                         if (rc == -EEXIST)
414                                 goto again;
415                         return rc;
416                 }
417         } else {
418                 /* If the master transno in update header is not
419                 * matched with the one in the record, then it means
420                 * the dtrq is originally created by master record,
421                 * so we need update master transno and reposition
422                 * the dtrq(by master transno) in the list and also
423                 * replace update record */
424                 if (record->ur_master_transno != 0 &&
425                     dtrq->dtrq_master_transno != record->ur_master_transno &&
426                     dtrq->dtrq_lur != NULL) {
427                         list_del_init(&dtrq->dtrq_list);
428                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
429                                                 record->ur_master_transno;
430
431                         dtrq->dtrq_master_transno = record->ur_master_transno;
432                         replace_record = true;
433                         /* try to insert again */
434                         rc = dtrq_insert(tdtd, dtrq);
435                         if (rc < 0) {
436                                 spin_unlock(&tdtd->tdtd_replay_list_lock);
437                                 dtrq_destroy(dtrq);
438                                 return rc;
439                         }
440                 }
441         }
442         spin_unlock(&tdtd->tdtd_replay_list_lock);
443
444         /* Because there should be only thread access the update record, so
445          * we do not need lock here */
446         if (replace_record) {
447                 /* Replace the update record and master transno */
448                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
449                 dtrq->dtrq_lur = NULL;
450                 dtrq->dtrq_lur_size = llog_update_record_size(lur);
451                 OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
452                 if (dtrq->dtrq_lur == NULL)
453                         return -ENOMEM;
454
455                 memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
456         }
457
458         /* This is a partial update records, let's try to append
459          * the record to the current replay request */
460         if (record->ur_flags & UPDATE_RECORD_CONTINUE)
461                 rc = dtrq_append_updates(dtrq, record);
462
463         /* Then create and add sub update request */
464         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
465
466         RETURN(rc);
467 }
468 EXPORT_SYMBOL(insert_update_records_to_replay_list);
469
470 /**
471  * Dump updates of distribute txns.
472  *
473  * Output all of recovery updates in the distribute txn list to the
474  * debug log.
475  *
476  * \param[in] tdtd      distribute txn data where all of distribute txn
477  *                      are listed.
478  * \param[in] mask      debug mask
479  */
480 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
481 {
482         struct distribute_txn_replay_req *dtrq;
483
484         spin_lock(&tdtd->tdtd_replay_list_lock);
485         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
486                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
487                                     false);
488         spin_unlock(&tdtd->tdtd_replay_list_lock);
489 }
490 EXPORT_SYMBOL(dtrq_list_dump);
491
492 /**
493  * Destroy distribute txn replay req
494  *
495  * Destroy distribute txn replay req and all of subs.
496  *
497  * \param[in] dtrq      distribute txn replqy req to be destroyed.
498  */
499 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
500 {
501         struct distribute_txn_replay_req_sub    *dtrqs;
502         struct distribute_txn_replay_req_sub    *tmp;
503
504         LASSERT(list_empty(&dtrq->dtrq_list));
505         CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
506                dtrq->dtrq_master_transno);
507         spin_lock(&dtrq->dtrq_sub_list_lock);
508         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
509                 struct sub_thandle_cookie *stc;
510                 struct sub_thandle_cookie *tmp;
511
512                 list_del(&dtrqs->dtrqs_list);
513                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
514                                          stc_list) {
515                         list_del(&stc->stc_list);
516                         OBD_FREE_PTR(stc);
517                 }
518                 OBD_FREE_PTR(dtrqs);
519         }
520         spin_unlock(&dtrq->dtrq_sub_list_lock);
521
522         if (dtrq->dtrq_lur != NULL)
523                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
524
525         OBD_FREE_PTR(dtrq);
526 }
527 EXPORT_SYMBOL(dtrq_destroy);
528
529 /**
530  * Destroy all of replay req.
531  *
532  * Destroy all of replay req in the replay list.
533  *
534  * \param[in] tdtd      target distribute txn data where the replay list is.
535  */
536 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
537 {
538         struct distribute_txn_replay_req *dtrq;
539         struct distribute_txn_replay_req *tmp;
540
541         spin_lock(&tdtd->tdtd_replay_list_lock);
542         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
543                                  dtrq_list) {
544                 list_del_init(&dtrq->dtrq_list);
545                 dtrq_destroy(dtrq);
546         }
547         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
548                                  dtrq_list) {
549                 list_del_init(&dtrq->dtrq_list);
550                 dtrq_destroy(dtrq);
551         }
552         spin_unlock(&tdtd->tdtd_replay_list_lock);
553 }
554 EXPORT_SYMBOL(dtrq_list_destroy);
555
556 /**
557  * Get next req in the replay list
558  *
559  * Get next req needs to be replayed, since it is a sorted list
560  * (by master MDT transno)
561  *
562  * \param[in] tdtd      distribute txn data where the replay list is
563  *
564  * \retval              the pointer of update recovery header
565  */
566 struct distribute_txn_replay_req *
567 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
568 {
569         struct distribute_txn_replay_req *dtrq = NULL;
570
571         spin_lock(&tdtd->tdtd_replay_list_lock);
572         if (!list_empty(&tdtd->tdtd_replay_list)) {
573                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
574                                  struct distribute_txn_replay_req, dtrq_list);
575                 list_del_init(&dtrq->dtrq_list);
576         }
577         spin_unlock(&tdtd->tdtd_replay_list_lock);
578
579         return dtrq;
580 }
581 EXPORT_SYMBOL(distribute_txn_get_next_req);
582
583 /**
584  * Get next transno in the replay list, because this is the sorted
585  * list, so it will return the transno of next req in the list.
586  *
587  * \param[in] tdtd      distribute txn data where the replay list is
588  *
589  * \retval              the transno of next update in the list
590  */
591 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
592 {
593         struct distribute_txn_replay_req        *dtrq = NULL;
594         __u64                                   transno = 0;
595
596         spin_lock(&tdtd->tdtd_replay_list_lock);
597         if (!list_empty(&tdtd->tdtd_replay_list)) {
598                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
599                                  struct distribute_txn_replay_req, dtrq_list);
600                 transno = dtrq->dtrq_master_transno;
601         }
602         spin_unlock(&tdtd->tdtd_replay_list_lock);
603
604         CDEBUG(D_HA, "%s: Next update transno %llu\n",
605                tdtd->tdtd_lut->lut_obd->obd_name, transno);
606         return transno;
607 }
608 EXPORT_SYMBOL(distribute_txn_get_next_transno);
609
610 struct distribute_txn_replay_req *
611 distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
612                                   __u64 transno)
613 {
614         struct distribute_txn_replay_req *dtrq = NULL;
615         struct distribute_txn_replay_req *iter;
616
617         spin_lock(&tdtd->tdtd_replay_list_lock);
618         list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
619                 if (iter->dtrq_master_transno == transno) {
620                         dtrq = iter;
621                         break;
622                 }
623         }
624         spin_unlock(&tdtd->tdtd_replay_list_lock);
625         return dtrq;
626 }
627
628 bool is_req_replayed_by_update(struct ptlrpc_request *req)
629 {
630         struct lu_target *tgt = class_exp2tgt(req->rq_export);
631         struct distribute_txn_replay_req *dtrq;
632
633         if (tgt->lut_tdtd == NULL)
634                 return false;
635
636         dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
637                                         lustre_msg_get_transno(req->rq_reqmsg));
638         if (dtrq == NULL)
639                 return false;
640
641         return true;
642 }
643 EXPORT_SYMBOL(is_req_replayed_by_update);
644
645 /**
646  * Check if the update of one object is committed
647  *
648  * Check whether the update for the object is committed by checking whether
649  * the correspondent sub exists in the replay req. If it is committed, mark
650  * the committed flag in correspondent the sub thandle.
651  *
652  * \param[in] env       execution environment
653  * \param[in] dtrq      replay request
654  * \param[in] dt_obj    object for the update
655  * \param[in] top_th    top thandle
656  * \param[in] sub_th    sub thandle which the update belongs to
657  *
658  * \retval              1 if the update is not committed.
659  * \retval              0 if the update is committed.
660  * \retval              negative errno if some other failures happen.
661  */
662 static int update_is_committed(const struct lu_env *env,
663                                struct distribute_txn_replay_req *dtrq,
664                                struct dt_object *dt_obj,
665                                struct top_thandle *top_th,
666                                struct sub_thandle *st)
667 {
668         struct seq_server_site  *seq_site;
669         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
670         struct distribute_txn_replay_req_sub    *dtrqs;
671         __u32                   mdt_index;
672         ENTRY;
673
674         if (st->st_sub_th != NULL)
675                 RETURN(1);
676
677         if (st->st_committed)
678                 RETURN(0);
679
680         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
681         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
682                 mdt_index = fid_oid(fid);
683         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
684                 mdt_index = seq_site->ss_node_id;
685         } else {
686                 struct lu_server_fld *fld;
687                 struct lu_seq_range range = {0};
688                 int rc;
689
690                 fld = seq_site->ss_server_fld;
691                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
692                 LASSERT(fld->lsf_seq_lookup != NULL);
693                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
694                                          &range);
695                 if (rc < 0)
696                         RETURN(rc);
697                 mdt_index = range.lsr_index;
698         }
699
700         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
701         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
702                 st->st_committed = 1;
703                 if (dtrqs != NULL) {
704                         struct sub_thandle_cookie *stc;
705                         struct sub_thandle_cookie *tmp;
706
707                         list_for_each_entry_safe(stc, tmp,
708                                                  &dtrqs->dtrqs_cookie_list,
709                                                  stc_list)
710                                 list_move(&stc->stc_list, &st->st_cookie_list);
711                 }
712                 RETURN(0);
713         }
714
715         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
716                mdt_index);
717
718         RETURN(1);
719 }
720
721 /**
722  * Implementation of different update methods for update recovery.
723  *
724  * These following functions update_recovery_$(update_name) implement
725  * different updates recovery methods. They will extract the parameters
726  * from the common parameters area and call correspondent dt API to redo
727  * the update.
728  *
729  * \param[in] env       execution environment
730  * \param[in] op        update operation to be replayed
731  * \param[in] params    common update parameters which holds all parameters
732  *                      of the operation
733  * \param[in] th        transaction handle
734  * \param[in] declare   indicate it will do declare or real execution, true
735  *                      means declare, false means real execution
736  *
737  * \retval              0 if it succeeds.
738  * \retval              negative errno if it fails.
739  */
740 static int update_recovery_create(const struct lu_env *env,
741                                   struct dt_object *dt_obj,
742                                   const struct update_op *op,
743                                   const struct update_params *params,
744                                   struct thandle_exec_args *ta,
745                                   struct thandle *th)
746 {
747         struct update_thread_info *uti = update_env_info(env);
748         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
749         struct lu_attr          *attr = &uti->uti_attr;
750         struct obdo             *wobdo;
751         struct obdo             *lobdo = &uti->uti_obdo;
752         struct dt_object_format dof;
753         __u16                   size;
754         unsigned int            param_count;
755         int rc;
756         ENTRY;
757
758         if (dt_object_exists(dt_obj))
759                 RETURN(-EEXIST);
760
761         param_count = lur->lur_update_rec.ur_param_count;
762         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
763                                             param_count, &size);
764         if (wobdo == NULL)
765                 RETURN(-EIO);
766         if (size != sizeof(*wobdo))
767                 RETURN(-EIO);
768
769         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
770                 lustre_swab_obdo(wobdo);
771
772         lustre_get_wire_obdo(NULL, lobdo, wobdo);
773         la_from_obdo(attr, lobdo, lobdo->o_valid);
774
775         dof.dof_type = dt_mode_to_dft(attr->la_mode);
776
777         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
778                            ta, th, NULL, 0);
779
780         RETURN(rc);
781 }
782
783 static int update_recovery_destroy(const struct lu_env *env,
784                                    struct dt_object *dt_obj,
785                                    const struct update_op *op,
786                                    const struct update_params *params,
787                                    struct thandle_exec_args *ta,
788                                    struct thandle *th)
789 {
790         int rc;
791         ENTRY;
792
793         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
794
795         RETURN(rc);
796 }
797
798 static int update_recovery_ref_add(const struct lu_env *env,
799                                    struct dt_object *dt_obj,
800                                    const struct update_op *op,
801                                    const struct update_params *params,
802                                    struct thandle_exec_args *ta,
803                                    struct thandle *th)
804 {
805         int rc;
806         ENTRY;
807
808         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
809
810         RETURN(rc);
811 }
812
813 static int update_recovery_ref_del(const struct lu_env *env,
814                                    struct dt_object *dt_obj,
815                                    const struct update_op *op,
816                                    const struct update_params *params,
817                                    struct thandle_exec_args *ta,
818                                    struct thandle *th)
819 {
820         int rc;
821         ENTRY;
822
823         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
824
825         RETURN(rc);
826 }
827
828 static int update_recovery_attr_set(const struct lu_env *env,
829                                     struct dt_object *dt_obj,
830                                     const struct update_op *op,
831                                     const struct update_params *params,
832                                     struct thandle_exec_args *ta,
833                                     struct thandle *th)
834 {
835         struct update_thread_info *uti = update_env_info(env);
836         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
837         struct obdo     *wobdo;
838         struct obdo     *lobdo = &uti->uti_obdo;
839         struct lu_attr  *attr = &uti->uti_attr;
840         __u16           size;
841         unsigned int    param_count;
842         int             rc;
843         ENTRY;
844
845         param_count = lur->lur_update_rec.ur_param_count;
846         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
847                                             param_count, &size);
848         if (wobdo == NULL)
849                 RETURN(-EIO);
850         if (size != sizeof(*wobdo))
851                 RETURN(-EIO);
852
853         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
854                 lustre_swab_obdo(wobdo);
855
856         lustre_get_wire_obdo(NULL, lobdo, wobdo);
857         la_from_obdo(attr, lobdo, lobdo->o_valid);
858
859         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
860
861         RETURN(rc);
862 }
863
864 static int update_recovery_xattr_set(const struct lu_env *env,
865                                      struct dt_object *dt_obj,
866                                      const struct update_op *op,
867                                      const struct update_params *params,
868                                      struct thandle_exec_args *ta,
869                                      struct thandle *th)
870 {
871         struct update_thread_info *uti = update_env_info(env);
872         char            *buf;
873         char            *name;
874         int             fl;
875         __u16           size;
876         __u32           param_count;
877         int             rc;
878         ENTRY;
879
880         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
881         name = update_params_get_param_buf(params,
882                                            op->uop_params_off[0],
883                                            param_count, &size);
884         if (name == NULL)
885                 RETURN(-EIO);
886
887         buf = update_params_get_param_buf(params,
888                                           op->uop_params_off[1],
889                                           param_count, &size);
890         if (buf == NULL)
891                 RETURN(-EIO);
892
893         uti->uti_buf.lb_buf = buf;
894         uti->uti_buf.lb_len = (size_t)size;
895
896         buf = update_params_get_param_buf(params, op->uop_params_off[2],
897                                           param_count, &size);
898         if (buf == NULL)
899                 RETURN(-EIO);
900         if (size != sizeof(fl))
901                 RETURN(-EIO);
902
903         fl = le32_to_cpu(*(int *)buf);
904
905         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
906                               NULL, 0);
907
908         RETURN(rc);
909 }
910
911 static int update_recovery_index_insert(const struct lu_env *env,
912                                         struct dt_object *dt_obj,
913                                         const struct update_op *op,
914                                         const struct update_params *params,
915                                         struct thandle_exec_args *ta,
916                                         struct thandle *th)
917 {
918         struct update_thread_info *uti = update_env_info(env);
919         struct lu_fid           *fid;
920         char                    *name;
921         __u32                   param_count;
922         __u32                   *ptype;
923         __u32                   type;
924         __u16                   size;
925         int rc;
926         ENTRY;
927
928         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
929         name = update_params_get_param_buf(params, op->uop_params_off[0],
930                                            param_count, &size);
931         if (name == NULL)
932                 RETURN(-EIO);
933
934         fid = update_params_get_param_buf(params, op->uop_params_off[1],
935                                           param_count, &size);
936         if (fid == NULL)
937                 RETURN(-EIO);
938         if (size != sizeof(*fid))
939                 RETURN(-EIO);
940
941         fid_le_to_cpu(fid, fid);
942
943         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
944                                             param_count, &size);
945         if (ptype == NULL)
946                 RETURN(-EIO);
947         if (size != sizeof(*ptype))
948                 RETURN(-EIO);
949         type = le32_to_cpu(*ptype);
950
951         if (dt_try_as_dir(env, dt_obj) == 0)
952                 RETURN(-ENOTDIR);
953
954         uti->uti_rec.rec_fid = fid;
955         uti->uti_rec.rec_type = type;
956
957         rc = out_tx_index_insert(env, dt_obj,
958                                  (const struct dt_rec *)&uti->uti_rec,
959                                  (const struct dt_key *)name, ta, th,
960                                  NULL, 0);
961
962         RETURN(rc);
963 }
964
965 static int update_recovery_index_delete(const struct lu_env *env,
966                                         struct dt_object *dt_obj,
967                                         const struct update_op *op,
968                                         const struct update_params *params,
969                                         struct thandle_exec_args *ta,
970                                         struct thandle *th)
971 {
972         struct update_thread_info *uti = update_env_info(env);
973         __u32   param_count;
974         char    *name;
975         __u16   size;
976         int     rc;
977         ENTRY;
978
979         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
980         name = update_params_get_param_buf(params, op->uop_params_off[0],
981                                            param_count, &size);
982         if (name == NULL)
983                 RETURN(-EIO);
984
985         if (dt_try_as_dir(env, dt_obj) == 0)
986                 RETURN(-ENOTDIR);
987
988         rc = out_tx_index_delete(env, dt_obj,
989                                  (const struct dt_key *)name, ta, th, NULL, 0);
990
991         RETURN(rc);
992 }
993
994 static int update_recovery_write(const struct lu_env *env,
995                                  struct dt_object *dt_obj,
996                                  const struct update_op *op,
997                                  const struct update_params *params,
998                                  struct thandle_exec_args *ta,
999                                  struct thandle *th)
1000 {
1001         struct update_thread_info *uti = update_env_info(env);
1002         char            *buf;
1003         __u32           param_count;
1004         __u64           pos;
1005         __u16           size;
1006         int rc;
1007         ENTRY;
1008
1009         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1010         buf = update_params_get_param_buf(params, op->uop_params_off[0],
1011                                           param_count, &size);
1012         if (buf == NULL)
1013                 RETURN(-EIO);
1014
1015         uti->uti_buf.lb_buf = buf;
1016         uti->uti_buf.lb_len = size;
1017
1018         buf = update_params_get_param_buf(params, op->uop_params_off[1],
1019                                           param_count, &size);
1020         if (buf == NULL)
1021                 RETURN(-EIO);
1022
1023         pos = le64_to_cpu(*(__u64 *)buf);
1024
1025         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
1026                           ta, th, NULL, 0);
1027
1028         RETURN(rc);
1029 }
1030
1031 static int update_recovery_xattr_del(const struct lu_env *env,
1032                                      struct dt_object *dt_obj,
1033                                      const struct update_op *op,
1034                                      const struct update_params *params,
1035                                      struct thandle_exec_args *ta,
1036                                      struct thandle *th)
1037 {
1038         struct update_thread_info *uti = update_env_info(env);
1039         __u32   param_count;
1040         char    *name;
1041         __u16   size;
1042         int     rc;
1043         ENTRY;
1044
1045         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
1046         name = update_params_get_param_buf(params, op->uop_params_off[0],
1047                                            param_count, &size);
1048         if (name == NULL)
1049                 RETURN(-EIO);
1050
1051         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
1052
1053         RETURN(rc);
1054 }
1055
1056 /**
1057  * Update session information
1058  *
1059  * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
1060  * can be called correctly during update replay.
1061  *
1062  * \param[in] env       execution environment.
1063  * \param[in] tdtd      distribute data structure of the recovering tgt.
1064  * \param[in] th        thandle of this update replay.
1065  * \param[in] master_th master sub thandle.
1066  * \param[in] ta_arg    the tx arg structure to hold the update for updating
1067  *                      reply data.
1068  */
1069 static void update_recovery_update_ses(struct lu_env *env,
1070                                       struct target_distribute_txn_data *tdtd,
1071                                       struct thandle *th,
1072                                       struct thandle *master_th,
1073                                       struct distribute_txn_replay_req *dtrq,
1074                                       struct tx_arg *ta_arg)
1075 {
1076         struct tgt_session_info *tsi;
1077         struct lu_target        *lut = tdtd->tdtd_lut;
1078         struct obd_export       *export;
1079         struct cfs_hash         *hash;
1080         struct top_thandle      *top_th;
1081         struct lsd_reply_data   *lrd;
1082         size_t                  size;
1083
1084         tsi = tgt_ses_info(env);
1085         if (tsi->tsi_exp != NULL)
1086                 return;
1087
1088         size = ta_arg->u.write.buf.lb_len;
1089         lrd = ta_arg->u.write.buf.lb_buf;
1090         if (size != sizeof(*lrd) || lrd == NULL)
1091                 return;
1092
1093         lrd->lrd_transno         = le64_to_cpu(lrd->lrd_transno);
1094         lrd->lrd_xid             = le64_to_cpu(lrd->lrd_xid);
1095         lrd->lrd_data            = le64_to_cpu(lrd->lrd_data);
1096         lrd->lrd_result          = le32_to_cpu(lrd->lrd_result);
1097         lrd->lrd_client_gen      = le32_to_cpu(lrd->lrd_client_gen);
1098
1099         CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
1100         if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
1101                 return;
1102
1103         hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
1104         if (hash == NULL)
1105                 return;
1106
1107         export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
1108         if (export == NULL) {
1109                 cfs_hash_putref(hash);
1110                 return;
1111         }
1112
1113         tsi->tsi_exp = export;
1114         tsi->tsi_xid = lrd->lrd_xid;
1115         tsi->tsi_opdata = lrd->lrd_data;
1116         tsi->tsi_result = lrd->lrd_result;
1117         tsi->tsi_client_gen = lrd->lrd_client_gen;
1118         dtrq->dtrq_xid = lrd->lrd_xid;
1119         top_th = container_of(th, struct top_thandle, tt_super);
1120         top_th->tt_master_sub_thandle = master_th;
1121         cfs_hash_putref(hash);
1122 }
1123
1124 /**
1125  * Execute updates in the update replay records
1126  *
1127  * Declare distribute txn replay by update records and add the updates
1128  * to the execution list. Note: it will check if the update has been
1129  * committed, and only execute the updates if it is not committed to
1130  * disk.
1131  *
1132  * \param[in] env       execution environment
1133  * \param[in] tdtd      distribute txn replay data which hold all of replay
1134  *                      reqs and all replay parameters.
1135  * \param[in] dtrq      distribute transaction replay req.
1136  * \param[in] ta        thandle execute args.
1137  *
1138  * \retval              0 if declare succeeds.
1139  * \retval              negative errno if declare fails.
1140  */
1141 static int update_recovery_exec(const struct lu_env *env,
1142                                 struct target_distribute_txn_data *tdtd,
1143                                 struct distribute_txn_replay_req *dtrq,
1144                                 struct thandle_exec_args *ta)
1145 {
1146         struct llog_update_record *lur = dtrq->dtrq_lur;
1147         struct update_records   *records = &lur->lur_update_rec;
1148         struct update_ops       *ops = &records->ur_ops;
1149         struct update_params    *params = update_records_get_params(records);
1150         struct top_thandle      *top_th = container_of(ta->ta_handle,
1151                                                        struct top_thandle,
1152                                                        tt_super);
1153         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1154         struct update_op        *op;
1155         unsigned int            i;
1156         int                     rc = 0;
1157         ENTRY;
1158
1159         /* These records have been swabbed in llog_cat_process() */
1160         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1161              i++, op = update_op_next_op(op)) {
1162                 struct lu_fid           *fid = &op->uop_fid;
1163                 struct dt_object        *dt_obj;
1164                 struct dt_object        *sub_dt_obj;
1165                 struct dt_device        *sub_dt;
1166                 struct sub_thandle      *st;
1167
1168                 if (op->uop_type == OUT_NOOP)
1169                         continue;
1170
1171                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1172                 if (IS_ERR(dt_obj)) {
1173                         rc = PTR_ERR(dt_obj);
1174                         if (rc == -EREMCHG)
1175                                 LCONSOLE_WARN("%.16s: hit invalid OI mapping "
1176                                               "for "DFID" during recovering, "
1177                                               "that may because auto scrub is "
1178                                               "disabled on related MDT, and "
1179                                               "will cause recovery failure. "
1180                                               "Please enable auto scrub and "
1181                                               "retry the recovery.\n",
1182                                               tdtd->tdtd_lut->lut_obd->obd_name,
1183                                               PFID(fid));
1184
1185                         break;
1186                 }
1187                 sub_dt_obj = dt_object_child(dt_obj);
1188
1189                 /* Create sub thandle if not */
1190                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1191                 st = lookup_sub_thandle(tmt, sub_dt);
1192                 if (st == NULL) {
1193                         st = create_sub_thandle(tmt, sub_dt);
1194                         if (IS_ERR(st))
1195                                 GOTO(next, rc = PTR_ERR(st));
1196                 }
1197
1198                 /* check if updates on the OSD/OSP are committed */
1199                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1200                 if (rc == 0)
1201                         /* If this is committed, goto next */
1202                         goto next;
1203
1204                 if (rc < 0)
1205                         GOTO(next, rc);
1206
1207                 /* Create thandle for sub thandle if needed */
1208                 if (st->st_sub_th == NULL) {
1209                         rc = sub_thandle_trans_create(env, top_th, st);
1210                         if (rc != 0)
1211                                 GOTO(next, rc);
1212                 }
1213
1214                 CDEBUG(D_HA, "replay %uth update\n", i);
1215                 switch (op->uop_type) {
1216                 case OUT_CREATE:
1217                         rc = update_recovery_create(env, sub_dt_obj,
1218                                                     op, params, ta,
1219                                                     st->st_sub_th);
1220                         break;
1221                 case OUT_DESTROY:
1222                         rc = update_recovery_destroy(env, sub_dt_obj,
1223                                                      op, params, ta,
1224                                                      st->st_sub_th);
1225                         break;
1226                 case OUT_REF_ADD:
1227                         rc = update_recovery_ref_add(env, sub_dt_obj,
1228                                                      op, params, ta,
1229                                                      st->st_sub_th);
1230                         break;
1231                 case OUT_REF_DEL:
1232                         rc = update_recovery_ref_del(env, sub_dt_obj,
1233                                                      op, params, ta,
1234                                                      st->st_sub_th);
1235                         break;
1236                 case OUT_ATTR_SET:
1237                         rc = update_recovery_attr_set(env, sub_dt_obj,
1238                                                       op, params, ta,
1239                                                       st->st_sub_th);
1240                         break;
1241                 case OUT_XATTR_SET:
1242                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1243                                                        op, params, ta,
1244                                                        st->st_sub_th);
1245                         break;
1246                 case OUT_INDEX_INSERT:
1247                         rc = update_recovery_index_insert(env, sub_dt_obj,
1248                                                           op, params, ta,
1249                                                           st->st_sub_th);
1250                         break;
1251                 case OUT_INDEX_DELETE:
1252                         rc = update_recovery_index_delete(env, sub_dt_obj,
1253                                                           op, params, ta,
1254                                                           st->st_sub_th);
1255                         break;
1256                 case OUT_WRITE:
1257                         rc = update_recovery_write(env, sub_dt_obj,
1258                                                    op, params, ta,
1259                                                    st->st_sub_th);
1260                         break;
1261                 case OUT_XATTR_DEL:
1262                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1263                                                        op, params, ta,
1264                                                        st->st_sub_th);
1265                         break;
1266                 default:
1267                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1268                         rc = -EINVAL;
1269                         break;
1270                 }
1271 next:
1272                 dt_object_put(env, dt_obj);
1273                 if (rc < 0)
1274                         break;
1275         }
1276
1277         ta->ta_handle->th_result = rc;
1278         RETURN(rc);
1279 }
1280
1281 /**
1282  * redo updates on MDT if needed.
1283  *
1284  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1285  * this function to replay distribute txn updates on all MDTs. It only replay
1286  * updates on the MDT where the update record is missing.
1287  *
1288  * If the update already exists on the MDT, then it does not need replay the
1289  * updates on that MDT, and only mark the sub transaction has been committed
1290  * there.
1291  *
1292  * \param[in] env       execution environment
1293  * \param[in] tdtd      target distribute txn data, which holds the replay list
1294  *                      and all parameters needed by replay process.
1295  * \param[in] dtrq      distribute txn replay req.
1296  *
1297  * \retval              0 if replay succeeds.
1298  * \retval              negative errno if replay failes.
1299  */
1300 int distribute_txn_replay_handle(struct lu_env *env,
1301                                  struct target_distribute_txn_data *tdtd,
1302                                  struct distribute_txn_replay_req *dtrq)
1303 {
1304         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1305         struct thandle_exec_args *ta;
1306         struct lu_context       session_env;
1307         struct thandle          *th = NULL;
1308         struct top_thandle      *top_th;
1309         struct top_multiple_thandle *tmt;
1310         struct thandle_update_records *tur = NULL;
1311         int                     i;
1312         int                     rc = 0;
1313         ENTRY;
1314
1315         /* initialize session, it is needed for the handler of target */
1316         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1317         if (rc) {
1318                 CERROR("%s: failure to initialize session: rc = %d\n",
1319                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1320                 RETURN(rc);
1321         }
1322         lu_context_enter(&session_env);
1323         env->le_ses = &session_env;
1324         lu_env_refill(env);
1325         update_records_dump(records, D_HA, true);
1326         th = top_trans_create(env, NULL);
1327         if (IS_ERR(th))
1328                 GOTO(exit_session, rc = PTR_ERR(th));
1329
1330         ta = &update_env_info(env)->uti_tea;
1331         ta->ta_argno = 0;
1332
1333         update_env_info(env)->uti_dtrq = dtrq;
1334         /* Create distribute transaction structure for this top thandle */
1335         top_th = container_of(th, struct top_thandle, tt_super);
1336         rc = top_trans_create_tmt(env, top_th);
1337         if (rc < 0)
1338                 GOTO(stop_trans, rc);
1339
1340         th->th_dev = tdtd->tdtd_dt;
1341         ta->ta_handle = th;
1342
1343         /* check if the distribute transaction has been committed */
1344         tmt = top_th->tt_multiple_thandle;
1345         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1346         tmt->tmt_batchid = dtrq->dtrq_batchid;
1347         tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
1348
1349         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1350                 tmt->tmt_committed = 1;
1351
1352         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1353         if (rc < 0)
1354                 GOTO(stop_trans, rc);
1355
1356         /* If no updates are needed to be replayed, then mark this records as
1357          * committed, so commit thread distribute_txn_commit_thread() will
1358          * delete the record */
1359         if (ta->ta_argno == 0)
1360                 tmt->tmt_committed = 1;
1361
1362         tur = &update_env_info(env)->uti_tur;
1363         tur->tur_update_records = dtrq->dtrq_lur;
1364         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1365         tur->tur_update_params = NULL;
1366         tur->tur_update_param_count = 0;
1367         tmt->tmt_update_records = tur;
1368
1369         distribute_txn_insert_by_batchid(tmt);
1370         rc = top_trans_start(env, NULL, th);
1371         if (rc < 0)
1372                 GOTO(stop_trans, rc);
1373
1374         for (i = 0; i < ta->ta_argno; i++) {
1375                 struct tx_arg           *ta_arg;
1376                 struct dt_object        *dt_obj;
1377                 struct dt_device        *sub_dt;
1378                 struct sub_thandle      *st;
1379
1380                 ta_arg = ta->ta_args[i];
1381                 dt_obj = ta_arg->object;
1382
1383                 LASSERT(tmt->tmt_committed == 0);
1384                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1385                 st = lookup_sub_thandle(tmt, sub_dt);
1386
1387                 LASSERT(st != NULL);
1388                 LASSERT(st->st_sub_th != NULL);
1389                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1390                                              ta->ta_args[i]);
1391
1392                 /* If the update is to update the reply data, then
1393                  * we need set the session information, so
1394                  * tgt_last_rcvd_update() can be called correctly */
1395                 if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
1396                         update_recovery_update_ses(env, tdtd, th,
1397                                                    st->st_sub_th, dtrq, ta_arg);
1398
1399                 if (unlikely(rc < 0)) {
1400                         CDEBUG(D_HA, "error during execution of #%u from"
1401                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1402                                ta->ta_args[i]->line, rc);
1403                         while (--i > 0) {
1404                                 if (ta->ta_args[i]->undo_fn != NULL) {
1405                                         dt_obj = ta->ta_args[i]->object;
1406                                         sub_dt =
1407                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1408                                         st = lookup_sub_thandle(tmt, sub_dt);
1409                                         LASSERT(st != NULL);
1410                                         LASSERT(st->st_sub_th != NULL);
1411
1412                                         ta->ta_args[i]->undo_fn(env,
1413                                                                st->st_sub_th,
1414                                                                ta->ta_args[i]);
1415                                 } else {
1416                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1417                                              dt_obd_name(ta->ta_handle->th_dev),
1418                                                ta->ta_args[i]->file,
1419                                                ta->ta_args[i]->line, -ENOTSUPP);
1420                                 }
1421                         }
1422                         break;
1423                 }
1424                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1425                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1426         }
1427
1428 stop_trans:
1429         if (rc < 0)
1430                 th->th_result = rc;
1431         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1432         for (i = 0; i < ta->ta_argno; i++) {
1433                 if (ta->ta_args[i]->object != NULL) {
1434                         dt_object_put(env, ta->ta_args[i]->object);
1435                         ta->ta_args[i]->object = NULL;
1436                 }
1437         }
1438
1439         if (tur != NULL)
1440                 tur->tur_update_records = NULL;
1441
1442         if (tgt_ses_info(env)->tsi_exp != NULL) {
1443                 class_export_put(tgt_ses_info(env)->tsi_exp);
1444                 tgt_ses_info(env)->tsi_exp = NULL;
1445         }
1446 exit_session:
1447         lu_context_exit(&session_env);
1448         lu_context_fini(&session_env);
1449         RETURN(rc);
1450 }
1451 EXPORT_SYMBOL(distribute_txn_replay_handle);