Whamcloud - gitweb
LU-6837 update: re-lookup the dtrq in the replay list.
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <md_object.h>
48 #include <lustre_update.h>
49 #include <obd.h>
50 #include <obd_class.h>
51 #include "tgt_internal.h"
52
53 /**
54  * Lookup distribute_txn_replay req
55  *
56  * Lookup distribute_txn_replay in the replay list by batchid.
57  * It is assumed the list has been locked before calling this function.
58  *
59  * \param[in] tdtd      distribute_txn_data, which holds the replay
60  *                      list.
61  * \param[in] batchid   batchid used by lookup.
62  *
63  * \retval              pointer of the replay if succeeds.
64  * \retval              NULL if can not find it.
65  */
66 static struct distribute_txn_replay_req *
67 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
68 {
69         struct distribute_txn_replay_req        *tmp;
70         struct distribute_txn_replay_req        *dtrq = NULL;
71
72         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
73                 if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
74                         dtrq = tmp;
75                         break;
76                 }
77         }
78         return dtrq;
79 }
80
81 /**
82  * insert distribute txn replay req
83  *
84  * Insert distribute txn replay to the replay list, and it assumes the
85  * list has been looked. Note: the replay list is a sorted list, which
86  * is sorted by master transno. It is assumed the replay list has been
87  * locked before calling this function.
88  *
89  * \param[in] tdtd      target distribute txn data where replay list is
90  * \param[in] new       distribute txn replay to be inserted
91  *
92  * \retval              0 if insertion succeeds
93  * \retval              EEXIST if the dtrq already exists
94  */
95 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
96                         struct distribute_txn_replay_req *new)
97 {
98         struct distribute_txn_replay_req *iter;
99
100         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
101                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
102                     new->dtrq_lur->lur_update_rec.ur_master_transno)
103                         continue;
104
105                 /* If there are mulitple replay req with same transno, then
106                  * sort them with batchid */
107                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
108                     new->dtrq_lur->lur_update_rec.ur_master_transno &&
109                     iter->dtrq_lur->lur_update_rec.ur_batchid ==
110                     new->dtrq_lur->lur_update_rec.ur_batchid)
111                         return -EEXIST;
112
113                 /* If there are mulitple replay req with same transno, then
114                  * sort them with batchid */
115                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
116                     new->dtrq_lur->lur_update_rec.ur_master_transno &&
117                     iter->dtrq_lur->lur_update_rec.ur_batchid >
118                     new->dtrq_lur->lur_update_rec.ur_batchid)
119                         continue;
120
121                 list_add(&new->dtrq_list, &iter->dtrq_list);
122                 break;
123         }
124
125         if (list_empty(&new->dtrq_list))
126                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
127
128         return 0;
129 }
130
131 /**
132  * create distribute txn replay req
133  *
134  * Allocate distribute txn replay req according to the update records.
135  *
136  * \param[in] tdtd      target distribute txn data where replay list is.
137  * \param[in] record    update records from the update log.
138  *
139  * \retval              the pointer of distribute txn replay req if
140  *                      the creation succeeds.
141  * \retval              NULL if the creation fails.
142  */
143 static struct distribute_txn_replay_req *
144 dtrq_create(struct llog_update_record *lur)
145 {
146         struct distribute_txn_replay_req *new;
147
148         OBD_ALLOC_PTR(new);
149         if (new == NULL)
150                 RETURN(ERR_PTR(-ENOMEM));
151
152         new->dtrq_lur_size = llog_update_record_size(lur);
153         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
154         if (new->dtrq_lur == NULL) {
155                 OBD_FREE_PTR(new);
156                 RETURN(ERR_PTR(-ENOMEM));
157         }
158
159         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
160
161         spin_lock_init(&new->dtrq_sub_list_lock);
162         INIT_LIST_HEAD(&new->dtrq_sub_list);
163         INIT_LIST_HEAD(&new->dtrq_list);
164
165         RETURN(new);
166 }
167
168 /**
169  * Lookup distribute sub replay
170  *
171  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
172  * mdt_index.
173  *
174  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
175  * \param[in] mdt_index                 the mdt_index as the key of lookup
176  *
177  * \retval              the pointer of sub replay if it can be found.
178  * \retval              NULL if it can not find.
179  */
180 struct distribute_txn_replay_req_sub *
181 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
182 {
183         struct distribute_txn_replay_req_sub *dtrqs = NULL;
184         struct distribute_txn_replay_req_sub *tmp;
185
186         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
187                 if (tmp->dtrqs_mdt_index == mdt_index) {
188                         dtrqs = tmp;
189                         break;
190                 }
191         }
192         return dtrqs;
193 }
194
195 /**
196  * Try to add cookie to sub distribute txn request
197  *
198  * Check if the update log cookie has been added to the request, if not,
199  * add it to the dtrqs_cookie_list.
200  *
201  * \param[in] dtrqs     sub replay req where cookies to be added.
202  * \param[in] cookie    cookie to be added.
203  *
204  * \retval              0 if the cookie is adding succeeds.
205  * \retval              negative errno if adding fails.
206  */
207 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
208                                struct llog_cookie *cookie)
209 {
210         struct sub_thandle_cookie *new;
211
212         OBD_ALLOC_PTR(new);
213         if (new == NULL)
214                 return -ENOMEM;
215
216         INIT_LIST_HEAD(&new->stc_list);
217         new->stc_cookie = *cookie;
218         /* Note: only single thread will access one sub_request each time,
219          * so no need lock here */
220         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
221
222         return 0;
223 }
224
225 /**
226  * Insert distribute txn sub req replay
227  *
228  * Allocate sub replay req and insert distribute txn replay list.
229  *
230  * \param[in] dtrq      d to be added
231  * \param[in] cookie    the cookie of the update record
232  * \param[in] mdt_index the mdt_index of the update record
233  *
234  * \retval              0 if the adding succeeds.
235  * \retval              negative errno if the adding fails.
236  */
237 static int
238 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
239                            struct llog_cookie *cookie,
240                            __u32 mdt_index)
241 {
242         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
243         struct distribute_txn_replay_req_sub    *new;
244         int                                     rc;
245         ENTRY;
246
247         spin_lock(&dtrq->dtrq_sub_list_lock);
248         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
249         spin_unlock(&dtrq->dtrq_sub_list_lock);
250         if (dtrqs != NULL) {
251                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
252                 RETURN(0);
253         }
254
255         OBD_ALLOC_PTR(new);
256         if (new == NULL)
257                 RETURN(-ENOMEM);
258
259         INIT_LIST_HEAD(&new->dtrqs_list);
260         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
261         new->dtrqs_mdt_index = mdt_index;
262         spin_lock(&dtrq->dtrq_sub_list_lock);
263         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
264         if (dtrqs == NULL) {
265                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
266                 dtrqs = new;
267         } else {
268                 OBD_FREE_PTR(new);
269         }
270         spin_unlock(&dtrq->dtrq_sub_list_lock);
271
272         rc = dtrq_sub_add_cookie(dtrqs, cookie);
273
274         RETURN(rc);
275 }
276
277 /**
278  * append updates to the current replay updates
279  *
280  * Append more updates to the existent replay update. And this is only
281  * used when combining mulitple updates into one large updates during
282  * replay.
283  *
284  * \param[in] dtrq      the update replay request where the new update
285  *                      records will be added.
286  * \param[in] lur       the new update record.
287  *
288  * \retval              0 if appending succeeds.
289  * \retval              negative errno if appending fails.
290  */
291 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
292                                struct update_records *record)
293 {
294         struct llog_update_record *new_lur;
295         size_t lur_size = dtrq->dtrq_lur_size;
296         void *ptr;
297         ENTRY;
298
299         /* Because several threads might retrieve the same records from
300          * different targets, and we only need one copy of records. So
301          * we will check if the records is in the next one, if not, just
302          * skip it */
303         spin_lock(&dtrq->dtrq_sub_list_lock);
304         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
305                 spin_unlock(&dtrq->dtrq_sub_list_lock);
306                 RETURN(0);
307         }
308         dtrq->dtrq_lur->lur_update_rec.ur_index++;
309         spin_unlock(&dtrq->dtrq_sub_list_lock);
310
311         lur_size += update_records_size(record);
312         OBD_ALLOC_LARGE(new_lur, lur_size);
313         if (new_lur == NULL) {
314                 spin_lock(&dtrq->dtrq_sub_list_lock);
315                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
316                 spin_unlock(&dtrq->dtrq_sub_list_lock);
317                 RETURN(-ENOMEM);
318         }
319
320         /* Copy the old and new records to the new allocated buffer */
321         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
322         ptr = (char *)&new_lur->lur_update_rec +
323                 update_records_size(&new_lur->lur_update_rec);
324         memcpy(ptr, &record->ur_ops,
325                update_records_size(record) -
326                offsetof(struct update_records, ur_ops));
327
328         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
329         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
330         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
331
332         /* Replace the records */
333         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
334         dtrq->dtrq_lur = new_lur;
335         dtrq->dtrq_lur_size = lur_size;
336         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
337         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
338         RETURN(0);
339 }
340
341 /**
342  * Insert update records to the replay list.
343  *
344  * Allocate distribute txn replay req and insert it into the replay
345  * list, then insert the update records into the replay req.
346  *
347  * \param[in] tdtd      distribute txn replay data where the replay list
348  *                      is.
349  * \param[in] record    the update record
350  * \param[in] cookie    cookie of the record
351  * \param[in] index     mdt index of the record
352  *
353  * \retval              0 if the adding succeeds.
354  * \retval              negative errno if the adding fails.
355  */
356 int
357 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
358                                      struct llog_update_record *lur,
359                                      struct llog_cookie *cookie,
360                                      __u32 mdt_index)
361 {
362         struct distribute_txn_replay_req *dtrq;
363         struct update_records            *record = &lur->lur_update_rec;
364         int rc = 0;
365         ENTRY;
366
367         CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
368                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
369                record->ur_batchid, record->ur_master_transno, mdt_index);
370
371 again:
372         /* First try to build the replay update request with the records */
373         spin_lock(&tdtd->tdtd_replay_list_lock);
374         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
375         spin_unlock(&tdtd->tdtd_replay_list_lock);
376         if (dtrq == NULL) {
377                 /* If the transno in the update record is 0, it means the
378                  * update are from master MDT, and we will use the master
379                  * last committed transno as its batchid. Note: if it got
380                  * the records from the slave later, it needs to update
381                  * the batchid by the transno in slave update log (see below) */
382                 dtrq = dtrq_create(lur);
383                 if (IS_ERR(dtrq))
384                         RETURN(PTR_ERR(dtrq));
385
386                 if (record->ur_master_transno == 0)
387                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
388                                 tdtd->tdtd_lut->lut_last_transno;
389                 spin_lock(&tdtd->tdtd_replay_list_lock);
390                 rc = dtrq_insert(tdtd, dtrq);
391                 spin_unlock(&tdtd->tdtd_replay_list_lock);
392                 if (rc == -EEXIST) {
393                         /* Some one else already add the record */
394                         dtrq_destroy(dtrq);
395                         goto again;
396                 }
397         } else {
398                 struct update_records *dtrq_rec;
399
400                 /* If the master transno in update header is not
401                  * matched with the one in the record, then it means
402                  * the dtrq is originally created by master record,
403                  * and we need update master transno and reposition
404                  * the dtrq(by master transno). */
405                 dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
406                 if (record->ur_master_transno != 0 &&
407                     dtrq_rec->ur_master_transno != record->ur_master_transno) {
408                         dtrq_rec->ur_master_transno = record->ur_master_transno;
409                         spin_lock(&tdtd->tdtd_replay_list_lock);
410                         list_del_init(&dtrq->dtrq_list);
411                         rc = dtrq_insert(tdtd, dtrq);
412                         spin_unlock(&tdtd->tdtd_replay_list_lock);
413                         if (rc < 0)
414                                 return rc;
415                 }
416
417                 /* This is a partial update records, let's try to append
418                  * the record to the current replay request */
419                 if (record->ur_flags & UPDATE_RECORD_CONTINUE)
420                         rc = dtrq_append_updates(dtrq, record);
421         }
422
423         /* Then create and add sub update request */
424         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
425
426         RETURN(rc);
427 }
428 EXPORT_SYMBOL(insert_update_records_to_replay_list);
429
430 /**
431  * Dump updates of distribute txns.
432  *
433  * Output all of recovery updates in the distribute txn list to the
434  * debug log.
435  *
436  * \param[in] tdtd      distribute txn data where all of distribute txn
437  *                      are listed.
438  * \param[in] mask      debug mask
439  */
440 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
441 {
442         struct distribute_txn_replay_req *dtrq;
443
444         spin_lock(&tdtd->tdtd_replay_list_lock);
445         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
446                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
447                                     false);
448         spin_unlock(&tdtd->tdtd_replay_list_lock);
449 }
450 EXPORT_SYMBOL(dtrq_list_dump);
451
452 /**
453  * Destroy distribute txn replay req
454  *
455  * Destroy distribute txn replay req and all of subs.
456  *
457  * \param[in] dtrq      distribute txn replqy req to be destroyed.
458  */
459 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
460 {
461         struct distribute_txn_replay_req_sub    *dtrqs;
462         struct distribute_txn_replay_req_sub    *tmp;
463
464         LASSERT(list_empty(&dtrq->dtrq_list));
465         spin_lock(&dtrq->dtrq_sub_list_lock);
466         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
467                 struct sub_thandle_cookie *stc;
468                 struct sub_thandle_cookie *tmp;
469
470                 list_del(&dtrqs->dtrqs_list);
471                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
472                                          stc_list) {
473                         list_del(&stc->stc_list);
474                         OBD_FREE_PTR(stc);
475                 }
476                 OBD_FREE_PTR(dtrqs);
477         }
478         spin_unlock(&dtrq->dtrq_sub_list_lock);
479
480         if (dtrq->dtrq_lur != NULL)
481                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
482
483         OBD_FREE_PTR(dtrq);
484 }
485 EXPORT_SYMBOL(dtrq_destroy);
486
487 /**
488  * Destroy all of replay req.
489  *
490  * Destroy all of replay req in the replay list.
491  *
492  * \param[in] tdtd      target distribute txn data where the replay list is.
493  */
494 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
495 {
496         struct distribute_txn_replay_req *dtrq;
497         struct distribute_txn_replay_req *tmp;
498
499         spin_lock(&tdtd->tdtd_replay_list_lock);
500         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
501                                  dtrq_list) {
502                 list_del_init(&dtrq->dtrq_list);
503                 dtrq_destroy(dtrq);
504         }
505         spin_unlock(&tdtd->tdtd_replay_list_lock);
506 }
507 EXPORT_SYMBOL(dtrq_list_destroy);
508
509 /**
510  * Get next req in the replay list
511  *
512  * Get next req needs to be replayed, since it is a sorted list
513  * (by master MDT transno)
514  *
515  * \param[in] tdtd      distribute txn data where the replay list is
516  *
517  * \retval              the pointer of update recovery header
518  */
519 struct distribute_txn_replay_req *
520 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
521 {
522         struct distribute_txn_replay_req *dtrq = NULL;
523
524         spin_lock(&tdtd->tdtd_replay_list_lock);
525         if (!list_empty(&tdtd->tdtd_replay_list)) {
526                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
527                                  struct distribute_txn_replay_req, dtrq_list);
528                 list_del_init(&dtrq->dtrq_list);
529         }
530         spin_unlock(&tdtd->tdtd_replay_list_lock);
531
532         return dtrq;
533 }
534 EXPORT_SYMBOL(distribute_txn_get_next_req);
535
536 /**
537  * Get next transno in the replay list, because this is the sorted
538  * list, so it will return the transno of next req in the list.
539  *
540  * \param[in] tdtd      distribute txn data where the replay list is
541  *
542  * \retval              the transno of next update in the list
543  */
544 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
545 {
546         struct distribute_txn_replay_req        *dtrq = NULL;
547         __u64                                   transno = 0;
548
549         spin_lock(&tdtd->tdtd_replay_list_lock);
550         if (!list_empty(&tdtd->tdtd_replay_list)) {
551                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
552                                  struct distribute_txn_replay_req, dtrq_list);
553                 transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
554         }
555         spin_unlock(&tdtd->tdtd_replay_list_lock);
556
557         CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
558                tdtd->tdtd_lut->lut_obd->obd_name, transno);
559         return transno;
560 }
561 EXPORT_SYMBOL(distribute_txn_get_next_transno);
562
563 /**
564  * Check if the update of one object is committed
565  *
566  * Check whether the update for the object is committed by checking whether
567  * the correspondent sub exists in the replay req. If it is committed, mark
568  * the committed flag in correspondent the sub thandle.
569  *
570  * \param[in] env       execution environment
571  * \param[in] dtrq      replay request
572  * \param[in] dt_obj    object for the update
573  * \param[in] top_th    top thandle
574  * \param[in] sub_th    sub thandle which the update belongs to
575  *
576  * \retval              1 if the update is not committed.
577  * \retval              0 if the update is committed.
578  * \retval              negative errno if some other failures happen.
579  */
580 static int update_is_committed(const struct lu_env *env,
581                                struct distribute_txn_replay_req *dtrq,
582                                struct dt_object *dt_obj,
583                                struct top_thandle *top_th,
584                                struct sub_thandle *st)
585 {
586         struct seq_server_site  *seq_site;
587         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
588         struct distribute_txn_replay_req_sub    *dtrqs;
589         __u32                   mdt_index;
590         ENTRY;
591
592         if (st->st_sub_th != NULL)
593                 RETURN(1);
594
595         if (st->st_committed)
596                 RETURN(0);
597
598         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
599         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
600                 mdt_index = fid_oid(fid);
601         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
602                 mdt_index = seq_site->ss_node_id;
603         } else {
604                 struct lu_server_fld *fld;
605                 struct lu_seq_range range = {0};
606                 int rc;
607
608                 fld = seq_site->ss_server_fld;
609                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
610                 LASSERT(fld->lsf_seq_lookup != NULL);
611                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
612                                          &range);
613                 if (rc < 0)
614                         RETURN(rc);
615                 mdt_index = range.lsr_index;
616         }
617
618         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
619         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
620                 st->st_committed = 1;
621                 if (dtrqs != NULL) {
622                         struct sub_thandle_cookie *stc;
623                         struct sub_thandle_cookie *tmp;
624
625                         list_for_each_entry_safe(stc, tmp,
626                                                  &dtrqs->dtrqs_cookie_list,
627                                                  stc_list)
628                                 list_move(&stc->stc_list, &st->st_cookie_list);
629                 }
630                 RETURN(0);
631         }
632
633         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
634                mdt_index);
635
636         RETURN(1);
637 }
638
639 /**
640  * Implementation of different update methods for update recovery.
641  *
642  * These following functions update_recovery_$(update_name) implement
643  * different updates recovery methods. They will extract the parameters
644  * from the common parameters area and call correspondent dt API to redo
645  * the update.
646  *
647  * \param[in] env       execution environment
648  * \param[in] op        update operation to be replayed
649  * \param[in] params    common update parameters which holds all parameters
650  *                      of the operation
651  * \param[in] th        transaction handle
652  * \param[in] declare   indicate it will do declare or real execution, true
653  *                      means declare, false means real execution
654  *
655  * \retval              0 if it succeeds.
656  * \retval              negative errno if it fails.
657  */
658 static int update_recovery_create(const struct lu_env *env,
659                                   struct dt_object *dt_obj,
660                                   const struct update_op *op,
661                                   const struct update_params *params,
662                                   struct thandle_exec_args *ta,
663                                   struct thandle *th)
664 {
665         struct update_thread_info *uti = update_env_info(env);
666         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
667         struct lu_attr          *attr = &uti->uti_attr;
668         struct obdo             *wobdo;
669         struct obdo             *lobdo = &uti->uti_obdo;
670         struct dt_object_format dof;
671         __u16                   size;
672         unsigned int            param_count;
673         int rc;
674         ENTRY;
675
676         if (dt_object_exists(dt_obj))
677                 RETURN(-EEXIST);
678
679         param_count = lur->lur_update_rec.ur_param_count;
680         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
681                                             param_count, &size);
682         if (wobdo == NULL)
683                 RETURN(-EIO);
684         if (size != sizeof(*wobdo))
685                 RETURN(-EIO);
686
687         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
688                 lustre_swab_obdo(wobdo);
689
690         lustre_get_wire_obdo(NULL, lobdo, wobdo);
691         la_from_obdo(attr, lobdo, lobdo->o_valid);
692
693         dof.dof_type = dt_mode_to_dft(attr->la_mode);
694
695         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
696                            ta, th, NULL, 0);
697
698         RETURN(rc);
699 }
700
701 static int update_recovery_destroy(const struct lu_env *env,
702                                    struct dt_object *dt_obj,
703                                    const struct update_op *op,
704                                    const struct update_params *params,
705                                    struct thandle_exec_args *ta,
706                                    struct thandle *th)
707 {
708         int rc;
709         ENTRY;
710
711         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
712
713         RETURN(rc);
714 }
715
716 static int update_recovery_ref_add(const struct lu_env *env,
717                                    struct dt_object *dt_obj,
718                                    const struct update_op *op,
719                                    const struct update_params *params,
720                                    struct thandle_exec_args *ta,
721                                    struct thandle *th)
722 {
723         int rc;
724         ENTRY;
725
726         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
727
728         RETURN(rc);
729 }
730
731 static int update_recovery_ref_del(const struct lu_env *env,
732                                    struct dt_object *dt_obj,
733                                    const struct update_op *op,
734                                    const struct update_params *params,
735                                    struct thandle_exec_args *ta,
736                                    struct thandle *th)
737 {
738         int rc;
739         ENTRY;
740
741         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
742
743         RETURN(rc);
744 }
745
746 static int update_recovery_attr_set(const struct lu_env *env,
747                                     struct dt_object *dt_obj,
748                                     const struct update_op *op,
749                                     const struct update_params *params,
750                                     struct thandle_exec_args *ta,
751                                     struct thandle *th)
752 {
753         struct update_thread_info *uti = update_env_info(env);
754         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
755         struct obdo     *wobdo;
756         struct obdo     *lobdo = &uti->uti_obdo;
757         struct lu_attr  *attr = &uti->uti_attr;
758         __u16           size;
759         unsigned int    param_count;
760         int             rc;
761         ENTRY;
762
763         param_count = lur->lur_update_rec.ur_param_count;
764         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
765                                             param_count, &size);
766         if (wobdo == NULL)
767                 RETURN(-EIO);
768         if (size != sizeof(*wobdo))
769                 RETURN(-EIO);
770
771         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
772                 lustre_swab_obdo(wobdo);
773
774         lustre_get_wire_obdo(NULL, lobdo, wobdo);
775         la_from_obdo(attr, lobdo, lobdo->o_valid);
776
777         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
778
779         RETURN(rc);
780 }
781
782 static int update_recovery_xattr_set(const struct lu_env *env,
783                                      struct dt_object *dt_obj,
784                                      const struct update_op *op,
785                                      const struct update_params *params,
786                                      struct thandle_exec_args *ta,
787                                      struct thandle *th)
788 {
789         struct update_thread_info *uti = update_env_info(env);
790         char            *buf;
791         char            *name;
792         int             fl;
793         __u16           size;
794         __u32           param_count;
795         int             rc;
796         ENTRY;
797
798         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
799         name = update_params_get_param_buf(params,
800                                            op->uop_params_off[0],
801                                            param_count, &size);
802         if (name == NULL)
803                 RETURN(-EIO);
804
805         buf = update_params_get_param_buf(params,
806                                           op->uop_params_off[1],
807                                           param_count, &size);
808         if (buf == NULL)
809                 RETURN(-EIO);
810
811         uti->uti_buf.lb_buf = buf;
812         uti->uti_buf.lb_len = (size_t)size;
813
814         buf = update_params_get_param_buf(params, op->uop_params_off[2],
815                                           param_count, &size);
816         if (buf == NULL)
817                 RETURN(-EIO);
818         if (size != sizeof(fl))
819                 RETURN(-EIO);
820
821         fl = le32_to_cpu(*(int *)buf);
822
823         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
824                               NULL, 0);
825
826         RETURN(rc);
827 }
828
829 static int update_recovery_index_insert(const struct lu_env *env,
830                                         struct dt_object *dt_obj,
831                                         const struct update_op *op,
832                                         const struct update_params *params,
833                                         struct thandle_exec_args *ta,
834                                         struct thandle *th)
835 {
836         struct update_thread_info *uti = update_env_info(env);
837         struct lu_fid           *fid;
838         char                    *name;
839         __u32                   param_count;
840         __u32                   *ptype;
841         __u32                   type;
842         __u16                   size;
843         int rc;
844         ENTRY;
845
846         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
847         name = update_params_get_param_buf(params, op->uop_params_off[0],
848                                            param_count, &size);
849         if (name == NULL)
850                 RETURN(-EIO);
851
852         fid = update_params_get_param_buf(params, op->uop_params_off[1],
853                                           param_count, &size);
854         if (fid == NULL)
855                 RETURN(-EIO);
856         if (size != sizeof(*fid))
857                 RETURN(-EIO);
858
859         fid_le_to_cpu(fid, fid);
860
861         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
862                                             param_count, &size);
863         if (ptype == NULL)
864                 RETURN(-EIO);
865         if (size != sizeof(*ptype))
866                 RETURN(-EIO);
867         type = le32_to_cpu(*ptype);
868
869         if (dt_try_as_dir(env, dt_obj) == 0)
870                 RETURN(-ENOTDIR);
871
872         uti->uti_rec.rec_fid = fid;
873         uti->uti_rec.rec_type = type;
874
875         rc = out_tx_index_insert(env, dt_obj,
876                                  (const struct dt_rec *)&uti->uti_rec,
877                                  (const struct dt_key *)name, ta, th,
878                                  NULL, 0);
879
880         RETURN(rc);
881 }
882
883 static int update_recovery_index_delete(const struct lu_env *env,
884                                         struct dt_object *dt_obj,
885                                         const struct update_op *op,
886                                         const struct update_params *params,
887                                         struct thandle_exec_args *ta,
888                                         struct thandle *th)
889 {
890         struct update_thread_info *uti = update_env_info(env);
891         __u32   param_count;
892         char    *name;
893         __u16   size;
894         int     rc;
895         ENTRY;
896
897         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
898         name = update_params_get_param_buf(params, op->uop_params_off[0],
899                                            param_count, &size);
900         if (name == NULL)
901                 RETURN(-EIO);
902
903         if (dt_try_as_dir(env, dt_obj) == 0)
904                 RETURN(-ENOTDIR);
905
906         rc = out_tx_index_delete(env, dt_obj,
907                                  (const struct dt_key *)name, ta, th, NULL, 0);
908
909         RETURN(rc);
910 }
911
912 static int update_recovery_write(const struct lu_env *env,
913                                  struct dt_object *dt_obj,
914                                  const struct update_op *op,
915                                  const struct update_params *params,
916                                  struct thandle_exec_args *ta,
917                                  struct thandle *th)
918 {
919         struct update_thread_info *uti = update_env_info(env);
920         char            *buf;
921         __u32           param_count;
922         __u64           pos;
923         __u16           size;
924         int rc;
925         ENTRY;
926
927         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
928         buf = update_params_get_param_buf(params, op->uop_params_off[0],
929                                           param_count, &size);
930         if (buf == NULL)
931                 RETURN(-EIO);
932
933         uti->uti_buf.lb_buf = buf;
934         uti->uti_buf.lb_len = size;
935
936         buf = update_params_get_param_buf(params, op->uop_params_off[1],
937                                           param_count, &size);
938         if (buf == NULL)
939                 RETURN(-EIO);
940
941         pos = le64_to_cpu(*(__u64 *)buf);
942
943         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
944                           ta, th, NULL, 0);
945
946         RETURN(rc);
947 }
948
949 static int update_recovery_xattr_del(const struct lu_env *env,
950                                      struct dt_object *dt_obj,
951                                      const struct update_op *op,
952                                      const struct update_params *params,
953                                      struct thandle_exec_args *ta,
954                                      struct thandle *th)
955 {
956         struct update_thread_info *uti = update_env_info(env);
957         __u32   param_count;
958         char    *name;
959         __u16   size;
960         int     rc;
961         ENTRY;
962
963         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
964         name = update_params_get_param_buf(params, op->uop_params_off[0],
965                                            param_count, &size);
966         if (name == NULL)
967                 RETURN(-EIO);
968
969         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
970
971         RETURN(rc);
972 }
973
974 /**
975  * Execute updates in the update replay records
976  *
977  * Declare distribute txn replay by update records and add the updates
978  * to the execution list. Note: it will check if the update has been
979  * committed, and only execute the updates if it is not committed to
980  * disk.
981  *
982  * \param[in] env       execution environment
983  * \param[in] tdtd      distribute txn replay data which hold all of replay
984  *                      reqs and all replay parameters.
985  * \param[in] dtrq      distribute transaction replay req.
986  * \param[in] ta        thandle execute args.
987  *
988  * \retval              0 if declare succeeds.
989  * \retval              negative errno if declare fails.
990  */
991 static int update_recovery_exec(const struct lu_env *env,
992                                 struct target_distribute_txn_data *tdtd,
993                                 struct distribute_txn_replay_req *dtrq,
994                                 struct thandle_exec_args *ta)
995 {
996         struct llog_update_record *lur = dtrq->dtrq_lur;
997         struct update_records   *records = &lur->lur_update_rec;
998         struct update_ops       *ops = &records->ur_ops;
999         struct update_params    *params = update_records_get_params(records);
1000         struct top_thandle      *top_th = container_of(ta->ta_handle,
1001                                                        struct top_thandle,
1002                                                        tt_super);
1003         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1004         struct update_op        *op;
1005         unsigned int            i;
1006         int                     rc = 0;
1007         ENTRY;
1008
1009         /* These records have been swabbed in llog_cat_process() */
1010         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1011              i++, op = update_op_next_op(op)) {
1012                 struct lu_fid           *fid = &op->uop_fid;
1013                 struct dt_object        *dt_obj;
1014                 struct dt_object        *sub_dt_obj;
1015                 struct dt_device        *sub_dt;
1016                 struct sub_thandle      *st;
1017
1018                 if (op->uop_type == OUT_NOOP)
1019                         continue;
1020
1021                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1022                 if (IS_ERR(dt_obj)) {
1023                         rc = PTR_ERR(dt_obj);
1024                         break;
1025                 }
1026                 sub_dt_obj = dt_object_child(dt_obj);
1027
1028                 /* Create sub thandle if not */
1029                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1030                 st = lookup_sub_thandle(tmt, sub_dt);
1031                 if (st == NULL) {
1032                         st = create_sub_thandle(tmt, sub_dt);
1033                         if (IS_ERR(st))
1034                                 GOTO(next, rc = PTR_ERR(st));
1035                 }
1036
1037                 /* check if updates on the OSD/OSP are committed */
1038                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1039                 if (rc == 0)
1040                         /* If this is committed, goto next */
1041                         goto next;
1042
1043                 if (rc < 0)
1044                         GOTO(next, rc);
1045
1046                 /* Create thandle for sub thandle if needed */
1047                 if (st->st_sub_th == NULL) {
1048                         rc = sub_thandle_trans_create(env, top_th, st);
1049                         if (rc != 0)
1050                                 GOTO(next, rc);
1051                 }
1052
1053                 CDEBUG(D_HA, "replay %uth update\n", i);
1054                 switch (op->uop_type) {
1055                 case OUT_CREATE:
1056                         rc = update_recovery_create(env, sub_dt_obj,
1057                                                     op, params, ta,
1058                                                     st->st_sub_th);
1059                         break;
1060                 case OUT_DESTROY:
1061                         rc = update_recovery_destroy(env, sub_dt_obj,
1062                                                      op, params, ta,
1063                                                      st->st_sub_th);
1064                         break;
1065                 case OUT_REF_ADD:
1066                         rc = update_recovery_ref_add(env, sub_dt_obj,
1067                                                      op, params, ta,
1068                                                      st->st_sub_th);
1069                         break;
1070                 case OUT_REF_DEL:
1071                         rc = update_recovery_ref_del(env, sub_dt_obj,
1072                                                      op, params, ta,
1073                                                      st->st_sub_th);
1074                         break;
1075                 case OUT_ATTR_SET:
1076                         rc = update_recovery_attr_set(env, sub_dt_obj,
1077                                                       op, params, ta,
1078                                                       st->st_sub_th);
1079                         break;
1080                 case OUT_XATTR_SET:
1081                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1082                                                        op, params, ta,
1083                                                        st->st_sub_th);
1084                         break;
1085                 case OUT_INDEX_INSERT:
1086                         rc = update_recovery_index_insert(env, sub_dt_obj,
1087                                                           op, params, ta,
1088                                                           st->st_sub_th);
1089                         break;
1090                 case OUT_INDEX_DELETE:
1091                         rc = update_recovery_index_delete(env, sub_dt_obj,
1092                                                           op, params, ta,
1093                                                           st->st_sub_th);
1094                         break;
1095                 case OUT_WRITE:
1096                         rc = update_recovery_write(env, sub_dt_obj,
1097                                                    op, params, ta,
1098                                                    st->st_sub_th);
1099                         break;
1100                 case OUT_XATTR_DEL:
1101                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1102                                                        op, params, ta,
1103                                                        st->st_sub_th);
1104                         break;
1105                 default:
1106                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1107                         rc = -EINVAL;
1108                         break;
1109                 }
1110 next:
1111                 lu_object_put(env, &dt_obj->do_lu);
1112                 if (rc < 0)
1113                         break;
1114         }
1115
1116         ta->ta_handle->th_result = rc;
1117         RETURN(rc);
1118 }
1119
1120 /**
1121  * redo updates on MDT if needed.
1122  *
1123  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1124  * this function to replay distribute txn updates on all MDTs. It only replay
1125  * updates on the MDT where the update record is missing.
1126  *
1127  * If the update already exists on the MDT, then it does not need replay the
1128  * updates on that MDT, and only mark the sub transaction has been committed
1129  * there.
1130  *
1131  * \param[in] env       execution environment
1132  * \param[in] tdtd      target distribute txn data, which holds the replay list
1133  *                      and all parameters needed by replay process.
1134  * \param[in] dtrq      distribute txn replay req.
1135  *
1136  * \retval              0 if replay succeeds.
1137  * \retval              negative errno if replay failes.
1138  */
1139 int distribute_txn_replay_handle(struct lu_env *env,
1140                                  struct target_distribute_txn_data *tdtd,
1141                                  struct distribute_txn_replay_req *dtrq)
1142 {
1143         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1144         struct thandle_exec_args *ta;
1145         struct lu_context       session_env;
1146         struct thandle          *th = NULL;
1147         struct top_thandle      *top_th;
1148         struct top_multiple_thandle *tmt;
1149         struct thandle_update_records *tur = NULL;
1150         int                     i;
1151         int                     rc = 0;
1152         ENTRY;
1153
1154         /* initialize session, it is needed for the handler of target */
1155         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1156         if (rc) {
1157                 CERROR("%s: failure to initialize session: rc = %d\n",
1158                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1159                 RETURN(rc);
1160         }
1161         lu_context_enter(&session_env);
1162         env->le_ses = &session_env;
1163         lu_env_refill(env);
1164         update_records_dump(records, D_HA, true);
1165         th = top_trans_create(env, NULL);
1166         if (IS_ERR(th))
1167                 GOTO(exit_session, rc = PTR_ERR(th));
1168
1169         ta = &update_env_info(env)->uti_tea;
1170         ta->ta_argno = 0;
1171
1172         update_env_info(env)->uti_dtrq = dtrq;
1173         /* Create distribute transaction structure for this top thandle */
1174         top_th = container_of(th, struct top_thandle, tt_super);
1175         rc = top_trans_create_tmt(env, top_th);
1176         if (rc < 0)
1177                 GOTO(stop_trans, rc);
1178
1179         ta->ta_handle = th;
1180
1181         /* check if the distribute transaction has been committed */
1182         tmt = top_th->tt_multiple_thandle;
1183         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1184         tmt->tmt_batchid = records->ur_batchid;
1185         tgt_th_info(env)->tti_transno = records->ur_master_transno;
1186
1187         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1188                 tmt->tmt_committed = 1;
1189
1190         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1191         if (rc < 0)
1192                 GOTO(stop_trans, rc);
1193
1194         /* If no updates are needed to be replayed, then
1195          * mark this records as committed, so commit thread
1196          * distribute_txn_commit_thread() will delete the
1197          * record */
1198         if (ta->ta_argno == 0)
1199                 tmt->tmt_committed = 1;
1200
1201         tur = &update_env_info(env)->uti_tur;
1202         tur->tur_update_records = dtrq->dtrq_lur;
1203         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1204         tur->tur_update_params = NULL;
1205         tur->tur_update_param_count = 0;
1206         tmt->tmt_update_records = tur;
1207
1208         distribute_txn_insert_by_batchid(tmt);
1209         rc = top_trans_start(env, NULL, th);
1210         if (rc < 0)
1211                 GOTO(stop_trans, rc);
1212
1213         for (i = 0; i < ta->ta_argno; i++) {
1214                 struct tx_arg           *ta_arg;
1215                 struct dt_object        *dt_obj;
1216                 struct dt_device        *sub_dt;
1217                 struct sub_thandle      *st;
1218
1219                 ta_arg = ta->ta_args[i];
1220                 dt_obj = ta_arg->object;
1221
1222                 LASSERT(tmt->tmt_committed == 0);
1223                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1224                 st = lookup_sub_thandle(tmt, sub_dt);
1225                 LASSERT(st != NULL);
1226                 LASSERT(st->st_sub_th != NULL);
1227                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1228                                              ta->ta_args[i]);
1229                 if (unlikely(rc < 0)) {
1230                         CDEBUG(D_HA, "error during execution of #%u from"
1231                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1232                                ta->ta_args[i]->line, rc);
1233                         while (--i > 0) {
1234                                 if (ta->ta_args[i]->undo_fn != NULL) {
1235                                         dt_obj = ta->ta_args[i]->object;
1236                                         sub_dt =
1237                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1238                                         st = lookup_sub_thandle(tmt, sub_dt);
1239                                         LASSERT(st != NULL);
1240                                         LASSERT(st->st_sub_th != NULL);
1241
1242                                         ta->ta_args[i]->undo_fn(env,
1243                                                                st->st_sub_th,
1244                                                                ta->ta_args[i]);
1245                                 } else {
1246                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1247                                              dt_obd_name(ta->ta_handle->th_dev),
1248                                                ta->ta_args[i]->file,
1249                                                ta->ta_args[i]->line, -ENOTSUPP);
1250                                 }
1251                         }
1252                         break;
1253                 }
1254                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1255                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1256         }
1257
1258 stop_trans:
1259         if (rc < 0)
1260                 th->th_result = rc;
1261         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1262         for (i = 0; i < ta->ta_argno; i++) {
1263                 if (ta->ta_args[i]->object != NULL) {
1264                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1265                         ta->ta_args[i]->object = NULL;
1266                 }
1267         }
1268
1269         if (tur != NULL)
1270                 tur->tur_update_records = NULL;
1271 exit_session:
1272         lu_context_exit(&session_env);
1273         lu_context_fini(&session_env);
1274         RETURN(rc);
1275 }
1276 EXPORT_SYMBOL(distribute_txn_replay_handle);