Whamcloud - gitweb
LU-6602 update: split update llog record
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <md_object.h>
48 #include <lustre_update.h>
49 #include <obd.h>
50 #include <obd_class.h>
51 #include "tgt_internal.h"
52
53 /**
54  * Lookup distribute_txn_replay req
55  *
56  * Lookup distribute_txn_replay in the replay list by batchid.
57  * It is assumed the list has been locked before calling this function.
58  *
59  * \param[in] tdtd      distribute_txn_data, which holds the replay
60  *                      list.
61  * \param[in] batchid   batchid used by lookup.
62  *
63  * \retval              pointer of the replay if succeeds.
64  * \retval              NULL if can not find it.
65  */
66 static struct distribute_txn_replay_req *
67 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
68 {
69         struct distribute_txn_replay_req        *tmp;
70         struct distribute_txn_replay_req        *dtrq = NULL;
71
72         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
73                 if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
74                         dtrq = tmp;
75                         break;
76                 }
77         }
78         return dtrq;
79 }
80
81 /**
82  * insert distribute txn replay req
83  *
84  * Insert distribute txn replay to the replay list, and it assumes the
85  * list has been looked. Note: the replay list is a sorted list, which
86  * is sorted by master transno. It is assumed the replay list has been
87  * locked before calling this function.
88  *
89  * \param[in] tdtd      target distribute txn data where replay list is
90  * \param[in] new       distribute txn replay to be inserted
91  *
92  * \retval              0 if insertion succeeds
93  * \retval              EEXIST if the dtrq already exists
94  */
95 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
96                         struct distribute_txn_replay_req *new)
97 {
98         struct distribute_txn_replay_req *iter;
99
100         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
101                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
102                     new->dtrq_lur->lur_update_rec.ur_master_transno)
103                         continue;
104
105                 /* If there are mulitple replay req with same transno, then
106                  * sort them with batchid */
107                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
108                     new->dtrq_lur->lur_update_rec.ur_master_transno &&
109                     iter->dtrq_lur->lur_update_rec.ur_batchid ==
110                     new->dtrq_lur->lur_update_rec.ur_batchid)
111                         return -EEXIST;
112
113                 /* If there are mulitple replay req with same transno, then
114                  * sort them with batchid */
115                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
116                     new->dtrq_lur->lur_update_rec.ur_master_transno &&
117                     iter->dtrq_lur->lur_update_rec.ur_batchid >
118                     new->dtrq_lur->lur_update_rec.ur_batchid)
119                         continue;
120
121                 list_add(&new->dtrq_list, &iter->dtrq_list);
122                 break;
123         }
124
125         if (list_empty(&new->dtrq_list))
126                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
127
128         return 0;
129 }
130
131 /**
132  * create distribute txn replay req
133  *
134  * Allocate distribute txn replay req according to the update records.
135  *
136  * \param[in] tdtd      target distribute txn data where replay list is.
137  * \param[in] record    update records from the update log.
138  *
139  * \retval              the pointer of distribute txn replay req if
140  *                      the creation succeeds.
141  * \retval              NULL if the creation fails.
142  */
143 static struct distribute_txn_replay_req *
144 dtrq_create(struct llog_update_record *lur)
145 {
146         struct distribute_txn_replay_req *new;
147
148         OBD_ALLOC_PTR(new);
149         if (new == NULL)
150                 RETURN(ERR_PTR(-ENOMEM));
151
152         new->dtrq_lur_size = llog_update_record_size(lur);
153         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
154         if (new->dtrq_lur == NULL) {
155                 OBD_FREE_PTR(new);
156                 RETURN(ERR_PTR(-ENOMEM));
157         }
158
159         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
160
161         spin_lock_init(&new->dtrq_sub_list_lock);
162         INIT_LIST_HEAD(&new->dtrq_sub_list);
163         INIT_LIST_HEAD(&new->dtrq_list);
164
165         RETURN(new);
166 }
167
168 /**
169  * Lookup distribute sub replay
170  *
171  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
172  * mdt_index.
173  *
174  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
175  * \param[in] mdt_index                 the mdt_index as the key of lookup
176  *
177  * \retval              the pointer of sub replay if it can be found.
178  * \retval              NULL if it can not find.
179  */
180 struct distribute_txn_replay_req_sub *
181 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
182 {
183         struct distribute_txn_replay_req_sub *dtrqs = NULL;
184         struct distribute_txn_replay_req_sub *tmp;
185
186         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
187                 if (tmp->dtrqs_mdt_index == mdt_index) {
188                         dtrqs = tmp;
189                         break;
190                 }
191         }
192         return dtrqs;
193 }
194
195 /**
196  * Try to add cookie to sub distribute txn request
197  *
198  * Check if the update log cookie has been added to the request, if not,
199  * add it to the dtrqs_cookie_list.
200  *
201  * \param[in] dtrqs     sub replay req where cookies to be added.
202  * \param[in] cookie    cookie to be added.
203  *
204  * \retval              0 if the cookie is adding succeeds.
205  * \retval              negative errno if adding fails.
206  */
207 static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
208                                struct llog_cookie *cookie)
209 {
210         struct sub_thandle_cookie *new;
211
212         OBD_ALLOC_PTR(new);
213         if (new == NULL)
214                 return -ENOMEM;
215
216         INIT_LIST_HEAD(&new->stc_list);
217         new->stc_cookie = *cookie;
218         /* Note: only single thread will access one sub_request each time,
219          * so no need lock here */
220         list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
221
222         return 0;
223 }
224
225 /**
226  * Insert distribute txn sub req replay
227  *
228  * Allocate sub replay req and insert distribute txn replay list.
229  *
230  * \param[in] dtrq      d to be added
231  * \param[in] cookie    the cookie of the update record
232  * \param[in] mdt_index the mdt_index of the update record
233  *
234  * \retval              0 if the adding succeeds.
235  * \retval              negative errno if the adding fails.
236  */
237 static int
238 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
239                            struct llog_cookie *cookie,
240                            __u32 mdt_index)
241 {
242         struct distribute_txn_replay_req_sub    *dtrqs = NULL;
243         struct distribute_txn_replay_req_sub    *new;
244         int                                     rc;
245         ENTRY;
246
247         spin_lock(&dtrq->dtrq_sub_list_lock);
248         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
249         spin_unlock(&dtrq->dtrq_sub_list_lock);
250         if (dtrqs != NULL) {
251                 rc = dtrq_sub_add_cookie(dtrqs, cookie);
252                 RETURN(0);
253         }
254
255         OBD_ALLOC_PTR(new);
256         if (new == NULL)
257                 RETURN(-ENOMEM);
258
259         INIT_LIST_HEAD(&new->dtrqs_list);
260         INIT_LIST_HEAD(&new->dtrqs_cookie_list);
261         new->dtrqs_mdt_index = mdt_index;
262         spin_lock(&dtrq->dtrq_sub_list_lock);
263         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
264         if (dtrqs == NULL) {
265                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
266                 dtrqs = new;
267         } else {
268                 OBD_FREE_PTR(new);
269         }
270         spin_unlock(&dtrq->dtrq_sub_list_lock);
271
272         rc = dtrq_sub_add_cookie(dtrqs, cookie);
273
274         RETURN(rc);
275 }
276
277 /**
278  * append updates to the current replay updates
279  *
280  * Append more updates to the existent replay update. And this is only
281  * used when combining mulitple updates into one large updates during
282  * replay.
283  *
284  * \param[in] dtrq      the update replay request where the new update
285  *                      records will be added.
286  * \param[in] lur       the new update record.
287  *
288  * \retval              0 if appending succeeds.
289  * \retval              negative errno if appending fails.
290  */
291 static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
292                                struct update_records *record)
293 {
294         struct llog_update_record *new_lur;
295         size_t lur_size = dtrq->dtrq_lur_size;
296         void *ptr;
297         ENTRY;
298
299         /* Because several threads might retrieve the same records from
300          * different targets, and we only need one copy of records. So
301          * we will check if the records is in the next one, if not, just
302          * skip it */
303         spin_lock(&dtrq->dtrq_sub_list_lock);
304         if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
305                 spin_unlock(&dtrq->dtrq_sub_list_lock);
306                 RETURN(0);
307         }
308         dtrq->dtrq_lur->lur_update_rec.ur_index++;
309         spin_unlock(&dtrq->dtrq_sub_list_lock);
310
311         lur_size += update_records_size(record);
312         OBD_ALLOC_LARGE(new_lur, lur_size);
313         if (new_lur == NULL) {
314                 spin_lock(&dtrq->dtrq_sub_list_lock);
315                 dtrq->dtrq_lur->lur_update_rec.ur_index--;
316                 spin_unlock(&dtrq->dtrq_sub_list_lock);
317                 RETURN(-ENOMEM);
318         }
319
320         /* Copy the old and new records to the new allocated buffer */
321         memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
322         ptr = (char *)&new_lur->lur_update_rec +
323                 update_records_size(&new_lur->lur_update_rec);
324         memcpy(ptr, &record->ur_ops,
325                update_records_size(record) -
326                offsetof(struct update_records, ur_ops));
327
328         new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
329         new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
330         new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
331
332         /* Replace the records */
333         OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
334         dtrq->dtrq_lur = new_lur;
335         dtrq->dtrq_lur_size = lur_size;
336         dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
337         update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
338         RETURN(0);
339 }
340
341 /**
342  * Insert update records to the replay list.
343  *
344  * Allocate distribute txn replay req and insert it into the replay
345  * list, then insert the update records into the replay req.
346  *
347  * \param[in] tdtd      distribute txn replay data where the replay list
348  *                      is.
349  * \param[in] record    the update record
350  * \param[in] cookie    cookie of the record
351  * \param[in] index     mdt index of the record
352  *
353  * \retval              0 if the adding succeeds.
354  * \retval              negative errno if the adding fails.
355  */
356 int
357 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
358                                      struct llog_update_record *lur,
359                                      struct llog_cookie *cookie,
360                                      __u32 mdt_index)
361 {
362         struct distribute_txn_replay_req *dtrq;
363         struct update_records            *record = &lur->lur_update_rec;
364         int rc = 0;
365         ENTRY;
366
367         CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
368                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
369                record->ur_batchid, record->ur_master_transno, mdt_index);
370
371         /* First try to build the replay update request with the records */
372         spin_lock(&tdtd->tdtd_replay_list_lock);
373         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
374         spin_unlock(&tdtd->tdtd_replay_list_lock);
375         if (dtrq == NULL) {
376                 /* If the transno in the update record is 0, it means the
377                  * update are from master MDT, and we will use the master
378                  * last committed transno as its batchid. Note: if it got
379                  * the records from the slave later, it needs to update
380                  * the batchid by the transno in slave update log (see below) */
381                 dtrq = dtrq_create(lur);
382                 if (IS_ERR(dtrq))
383                         RETURN(PTR_ERR(dtrq));
384
385                 if (record->ur_master_transno == 0)
386                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
387                                 tdtd->tdtd_lut->lut_last_transno;
388                 spin_lock(&tdtd->tdtd_replay_list_lock);
389                 rc = dtrq_insert(tdtd, dtrq);
390                 spin_unlock(&tdtd->tdtd_replay_list_lock);
391                 if (rc == -EEXIST) {
392                         /* Some one else already add the record */
393                         dtrq_destroy(dtrq);
394                         rc = 0;
395                 }
396         } else {
397                 struct update_records *dtrq_rec;
398
399                 /* If the master transno in update header is not
400                  * matched with the one in the record, then it means
401                  * the dtrq is originally created by master record,
402                  * and we need update master transno and reposition
403                  * the dtrq(by master transno). */
404                 dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
405                 if (record->ur_master_transno != 0 &&
406                     dtrq_rec->ur_master_transno != record->ur_master_transno) {
407                         dtrq_rec->ur_master_transno = record->ur_master_transno;
408                         spin_lock(&tdtd->tdtd_replay_list_lock);
409                         list_del_init(&dtrq->dtrq_list);
410                         rc = dtrq_insert(tdtd, dtrq);
411                         spin_unlock(&tdtd->tdtd_replay_list_lock);
412                         if (rc < 0)
413                                 return rc;
414                 }
415
416                 /* This is a partial update records, let's try to append
417                  * the record to the current replay request */
418                 if (record->ur_flags & UPDATE_RECORD_CONTINUE)
419                         rc = dtrq_append_updates(dtrq, record);
420         }
421
422         /* Then create and add sub update request */
423         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
424
425         RETURN(rc);
426 }
427 EXPORT_SYMBOL(insert_update_records_to_replay_list);
428
429 /**
430  * Dump updates of distribute txns.
431  *
432  * Output all of recovery updates in the distribute txn list to the
433  * debug log.
434  *
435  * \param[in] tdtd      distribute txn data where all of distribute txn
436  *                      are listed.
437  * \param[in] mask      debug mask
438  */
439 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
440 {
441         struct distribute_txn_replay_req *dtrq;
442
443         spin_lock(&tdtd->tdtd_replay_list_lock);
444         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
445                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
446                                     false);
447         spin_unlock(&tdtd->tdtd_replay_list_lock);
448 }
449 EXPORT_SYMBOL(dtrq_list_dump);
450
451 /**
452  * Destroy distribute txn replay req
453  *
454  * Destroy distribute txn replay req and all of subs.
455  *
456  * \param[in] dtrq      distribute txn replqy req to be destroyed.
457  */
458 void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
459 {
460         struct distribute_txn_replay_req_sub    *dtrqs;
461         struct distribute_txn_replay_req_sub    *tmp;
462
463         LASSERT(list_empty(&dtrq->dtrq_list));
464         spin_lock(&dtrq->dtrq_sub_list_lock);
465         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
466                 struct sub_thandle_cookie *stc;
467                 struct sub_thandle_cookie *tmp;
468
469                 list_del(&dtrqs->dtrqs_list);
470                 list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
471                                          stc_list) {
472                         list_del(&stc->stc_list);
473                         OBD_FREE_PTR(stc);
474                 }
475                 OBD_FREE_PTR(dtrqs);
476         }
477         spin_unlock(&dtrq->dtrq_sub_list_lock);
478
479         if (dtrq->dtrq_lur != NULL)
480                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
481
482         OBD_FREE_PTR(dtrq);
483 }
484 EXPORT_SYMBOL(dtrq_destroy);
485
486 /**
487  * Destroy all of replay req.
488  *
489  * Destroy all of replay req in the replay list.
490  *
491  * \param[in] tdtd      target distribute txn data where the replay list is.
492  */
493 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
494 {
495         struct distribute_txn_replay_req *dtrq;
496         struct distribute_txn_replay_req *tmp;
497
498         spin_lock(&tdtd->tdtd_replay_list_lock);
499         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
500                                  dtrq_list) {
501                 list_del_init(&dtrq->dtrq_list);
502                 dtrq_destroy(dtrq);
503         }
504         spin_unlock(&tdtd->tdtd_replay_list_lock);
505 }
506 EXPORT_SYMBOL(dtrq_list_destroy);
507
508 /**
509  * Get next req in the replay list
510  *
511  * Get next req needs to be replayed, since it is a sorted list
512  * (by master MDT transno)
513  *
514  * \param[in] tdtd      distribute txn data where the replay list is
515  *
516  * \retval              the pointer of update recovery header
517  */
518 struct distribute_txn_replay_req *
519 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
520 {
521         struct distribute_txn_replay_req *dtrq = NULL;
522
523         spin_lock(&tdtd->tdtd_replay_list_lock);
524         if (!list_empty(&tdtd->tdtd_replay_list)) {
525                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
526                                  struct distribute_txn_replay_req, dtrq_list);
527                 list_del_init(&dtrq->dtrq_list);
528         }
529         spin_unlock(&tdtd->tdtd_replay_list_lock);
530
531         return dtrq;
532 }
533 EXPORT_SYMBOL(distribute_txn_get_next_req);
534
535 /**
536  * Get next transno in the replay list, because this is the sorted
537  * list, so it will return the transno of next req in the list.
538  *
539  * \param[in] tdtd      distribute txn data where the replay list is
540  *
541  * \retval              the transno of next update in the list
542  */
543 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
544 {
545         struct distribute_txn_replay_req        *dtrq = NULL;
546         __u64                                   transno = 0;
547
548         spin_lock(&tdtd->tdtd_replay_list_lock);
549         if (!list_empty(&tdtd->tdtd_replay_list)) {
550                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
551                                  struct distribute_txn_replay_req, dtrq_list);
552                 transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
553         }
554         spin_unlock(&tdtd->tdtd_replay_list_lock);
555
556         CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
557                tdtd->tdtd_lut->lut_obd->obd_name, transno);
558         return transno;
559 }
560 EXPORT_SYMBOL(distribute_txn_get_next_transno);
561
562 /**
563  * Check if the update of one object is committed
564  *
565  * Check whether the update for the object is committed by checking whether
566  * the correspondent sub exists in the replay req. If it is committed, mark
567  * the committed flag in correspondent the sub thandle.
568  *
569  * \param[in] env       execution environment
570  * \param[in] dtrq      replay request
571  * \param[in] dt_obj    object for the update
572  * \param[in] top_th    top thandle
573  * \param[in] sub_th    sub thandle which the update belongs to
574  *
575  * \retval              1 if the update is not committed.
576  * \retval              0 if the update is committed.
577  * \retval              negative errno if some other failures happen.
578  */
579 static int update_is_committed(const struct lu_env *env,
580                                struct distribute_txn_replay_req *dtrq,
581                                struct dt_object *dt_obj,
582                                struct top_thandle *top_th,
583                                struct sub_thandle *st)
584 {
585         struct seq_server_site  *seq_site;
586         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
587         struct distribute_txn_replay_req_sub    *dtrqs;
588         __u32                   mdt_index;
589         ENTRY;
590
591         if (st->st_sub_th != NULL)
592                 RETURN(1);
593
594         if (st->st_committed)
595                 RETURN(0);
596
597         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
598         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
599                 mdt_index = fid_oid(fid);
600         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
601                 mdt_index = seq_site->ss_node_id;
602         } else {
603                 struct lu_server_fld *fld;
604                 struct lu_seq_range range = {0};
605                 int rc;
606
607                 fld = seq_site->ss_server_fld;
608                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
609                 LASSERT(fld->lsf_seq_lookup != NULL);
610                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
611                                          &range);
612                 if (rc < 0)
613                         RETURN(rc);
614                 mdt_index = range.lsr_index;
615         }
616
617         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
618         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
619                 st->st_committed = 1;
620                 if (dtrqs != NULL) {
621                         struct sub_thandle_cookie *stc;
622                         struct sub_thandle_cookie *tmp;
623
624                         list_for_each_entry_safe(stc, tmp,
625                                                  &dtrqs->dtrqs_cookie_list,
626                                                  stc_list)
627                                 list_move(&stc->stc_list, &st->st_cookie_list);
628                 }
629                 RETURN(0);
630         }
631
632         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
633                mdt_index);
634
635         RETURN(1);
636 }
637
638 /**
639  * Implementation of different update methods for update recovery.
640  *
641  * These following functions update_recovery_$(update_name) implement
642  * different updates recovery methods. They will extract the parameters
643  * from the common parameters area and call correspondent dt API to redo
644  * the update.
645  *
646  * \param[in] env       execution environment
647  * \param[in] op        update operation to be replayed
648  * \param[in] params    common update parameters which holds all parameters
649  *                      of the operation
650  * \param[in] th        transaction handle
651  * \param[in] declare   indicate it will do declare or real execution, true
652  *                      means declare, false means real execution
653  *
654  * \retval              0 if it succeeds.
655  * \retval              negative errno if it fails.
656  */
657 static int update_recovery_create(const struct lu_env *env,
658                                   struct dt_object *dt_obj,
659                                   const struct update_op *op,
660                                   const struct update_params *params,
661                                   struct thandle_exec_args *ta,
662                                   struct thandle *th)
663 {
664         struct update_thread_info *uti = update_env_info(env);
665         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
666         struct lu_attr          *attr = &uti->uti_attr;
667         struct obdo             *wobdo;
668         struct obdo             *lobdo = &uti->uti_obdo;
669         struct dt_object_format dof;
670         __u16                   size;
671         unsigned int            param_count;
672         int rc;
673         ENTRY;
674
675         if (dt_object_exists(dt_obj))
676                 RETURN(-EEXIST);
677
678         param_count = lur->lur_update_rec.ur_param_count;
679         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
680                                             param_count, &size);
681         if (wobdo == NULL)
682                 RETURN(-EIO);
683         if (size != sizeof(*wobdo))
684                 RETURN(-EIO);
685
686         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
687                 lustre_swab_obdo(wobdo);
688
689         lustre_get_wire_obdo(NULL, lobdo, wobdo);
690         la_from_obdo(attr, lobdo, lobdo->o_valid);
691
692         dof.dof_type = dt_mode_to_dft(attr->la_mode);
693
694         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
695                            ta, th, NULL, 0);
696
697         RETURN(rc);
698 }
699
700 static int update_recovery_destroy(const struct lu_env *env,
701                                    struct dt_object *dt_obj,
702                                    const struct update_op *op,
703                                    const struct update_params *params,
704                                    struct thandle_exec_args *ta,
705                                    struct thandle *th)
706 {
707         int rc;
708         ENTRY;
709
710         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
711
712         RETURN(rc);
713 }
714
715 static int update_recovery_ref_add(const struct lu_env *env,
716                                    struct dt_object *dt_obj,
717                                    const struct update_op *op,
718                                    const struct update_params *params,
719                                    struct thandle_exec_args *ta,
720                                    struct thandle *th)
721 {
722         int rc;
723         ENTRY;
724
725         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
726
727         RETURN(rc);
728 }
729
730 static int update_recovery_ref_del(const struct lu_env *env,
731                                    struct dt_object *dt_obj,
732                                    const struct update_op *op,
733                                    const struct update_params *params,
734                                    struct thandle_exec_args *ta,
735                                    struct thandle *th)
736 {
737         int rc;
738         ENTRY;
739
740         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
741
742         RETURN(rc);
743 }
744
745 static int update_recovery_attr_set(const struct lu_env *env,
746                                     struct dt_object *dt_obj,
747                                     const struct update_op *op,
748                                     const struct update_params *params,
749                                     struct thandle_exec_args *ta,
750                                     struct thandle *th)
751 {
752         struct update_thread_info *uti = update_env_info(env);
753         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
754         struct obdo     *wobdo;
755         struct obdo     *lobdo = &uti->uti_obdo;
756         struct lu_attr  *attr = &uti->uti_attr;
757         __u16           size;
758         unsigned int    param_count;
759         int             rc;
760         ENTRY;
761
762         param_count = lur->lur_update_rec.ur_param_count;
763         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
764                                             param_count, &size);
765         if (wobdo == NULL)
766                 RETURN(-EIO);
767         if (size != sizeof(*wobdo))
768                 RETURN(-EIO);
769
770         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
771                 lustre_swab_obdo(wobdo);
772
773         lustre_get_wire_obdo(NULL, lobdo, wobdo);
774         la_from_obdo(attr, lobdo, lobdo->o_valid);
775
776         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
777
778         RETURN(rc);
779 }
780
781 static int update_recovery_xattr_set(const struct lu_env *env,
782                                      struct dt_object *dt_obj,
783                                      const struct update_op *op,
784                                      const struct update_params *params,
785                                      struct thandle_exec_args *ta,
786                                      struct thandle *th)
787 {
788         struct update_thread_info *uti = update_env_info(env);
789         char            *buf;
790         char            *name;
791         int             fl;
792         __u16           size;
793         __u32           param_count;
794         int             rc;
795         ENTRY;
796
797         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
798         name = update_params_get_param_buf(params,
799                                            op->uop_params_off[0],
800                                            param_count, &size);
801         if (name == NULL)
802                 RETURN(-EIO);
803
804         buf = update_params_get_param_buf(params,
805                                           op->uop_params_off[1],
806                                           param_count, &size);
807         if (buf == NULL)
808                 RETURN(-EIO);
809
810         uti->uti_buf.lb_buf = buf;
811         uti->uti_buf.lb_len = (size_t)size;
812
813         buf = update_params_get_param_buf(params, op->uop_params_off[2],
814                                           param_count, &size);
815         if (buf == NULL)
816                 RETURN(-EIO);
817         if (size != sizeof(fl))
818                 RETURN(-EIO);
819
820         fl = le32_to_cpu(*(int *)buf);
821
822         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
823                               NULL, 0);
824
825         RETURN(rc);
826 }
827
828 static int update_recovery_index_insert(const struct lu_env *env,
829                                         struct dt_object *dt_obj,
830                                         const struct update_op *op,
831                                         const struct update_params *params,
832                                         struct thandle_exec_args *ta,
833                                         struct thandle *th)
834 {
835         struct update_thread_info *uti = update_env_info(env);
836         struct lu_fid           *fid;
837         char                    *name;
838         __u32                   param_count;
839         __u32                   *ptype;
840         __u32                   type;
841         __u16                   size;
842         int rc;
843         ENTRY;
844
845         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
846         name = update_params_get_param_buf(params, op->uop_params_off[0],
847                                            param_count, &size);
848         if (name == NULL)
849                 RETURN(-EIO);
850
851         fid = update_params_get_param_buf(params, op->uop_params_off[1],
852                                           param_count, &size);
853         if (fid == NULL)
854                 RETURN(-EIO);
855         if (size != sizeof(*fid))
856                 RETURN(-EIO);
857
858         fid_le_to_cpu(fid, fid);
859
860         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
861                                             param_count, &size);
862         if (ptype == NULL)
863                 RETURN(-EIO);
864         if (size != sizeof(*ptype))
865                 RETURN(-EIO);
866         type = le32_to_cpu(*ptype);
867
868         if (dt_try_as_dir(env, dt_obj) == 0)
869                 RETURN(-ENOTDIR);
870
871         uti->uti_rec.rec_fid = fid;
872         uti->uti_rec.rec_type = type;
873
874         rc = out_tx_index_insert(env, dt_obj,
875                                  (const struct dt_rec *)&uti->uti_rec,
876                                  (const struct dt_key *)name, ta, th,
877                                  NULL, 0);
878
879         RETURN(rc);
880 }
881
882 static int update_recovery_index_delete(const struct lu_env *env,
883                                         struct dt_object *dt_obj,
884                                         const struct update_op *op,
885                                         const struct update_params *params,
886                                         struct thandle_exec_args *ta,
887                                         struct thandle *th)
888 {
889         struct update_thread_info *uti = update_env_info(env);
890         __u32   param_count;
891         char    *name;
892         __u16   size;
893         int     rc;
894         ENTRY;
895
896         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
897         name = update_params_get_param_buf(params, op->uop_params_off[0],
898                                            param_count, &size);
899         if (name == NULL)
900                 RETURN(-EIO);
901
902         if (dt_try_as_dir(env, dt_obj) == 0)
903                 RETURN(-ENOTDIR);
904
905         rc = out_tx_index_delete(env, dt_obj,
906                                  (const struct dt_key *)name, ta, th, NULL, 0);
907
908         RETURN(rc);
909 }
910
911 static int update_recovery_write(const struct lu_env *env,
912                                  struct dt_object *dt_obj,
913                                  const struct update_op *op,
914                                  const struct update_params *params,
915                                  struct thandle_exec_args *ta,
916                                  struct thandle *th)
917 {
918         struct update_thread_info *uti = update_env_info(env);
919         char            *buf;
920         __u32           param_count;
921         __u64           pos;
922         __u16           size;
923         int rc;
924         ENTRY;
925
926         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
927         buf = update_params_get_param_buf(params, op->uop_params_off[0],
928                                           param_count, &size);
929         if (buf == NULL)
930                 RETURN(-EIO);
931
932         uti->uti_buf.lb_buf = buf;
933         uti->uti_buf.lb_len = size;
934
935         buf = update_params_get_param_buf(params, op->uop_params_off[1],
936                                           param_count, &size);
937         if (buf == NULL)
938                 RETURN(-EIO);
939
940         pos = le64_to_cpu(*(__u64 *)buf);
941
942         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
943                           ta, th, NULL, 0);
944
945         RETURN(rc);
946 }
947
948 static int update_recovery_xattr_del(const struct lu_env *env,
949                                      struct dt_object *dt_obj,
950                                      const struct update_op *op,
951                                      const struct update_params *params,
952                                      struct thandle_exec_args *ta,
953                                      struct thandle *th)
954 {
955         struct update_thread_info *uti = update_env_info(env);
956         __u32   param_count;
957         char    *name;
958         __u16   size;
959         int     rc;
960         ENTRY;
961
962         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
963         name = update_params_get_param_buf(params, op->uop_params_off[0],
964                                            param_count, &size);
965         if (name == NULL)
966                 RETURN(-EIO);
967
968         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
969
970         RETURN(rc);
971 }
972
973 /**
974  * Execute updates in the update replay records
975  *
976  * Declare distribute txn replay by update records and add the updates
977  * to the execution list. Note: it will check if the update has been
978  * committed, and only execute the updates if it is not committed to
979  * disk.
980  *
981  * \param[in] env       execution environment
982  * \param[in] tdtd      distribute txn replay data which hold all of replay
983  *                      reqs and all replay parameters.
984  * \param[in] dtrq      distribute transaction replay req.
985  * \param[in] ta        thandle execute args.
986  *
987  * \retval              0 if declare succeeds.
988  * \retval              negative errno if declare fails.
989  */
990 static int update_recovery_exec(const struct lu_env *env,
991                                 struct target_distribute_txn_data *tdtd,
992                                 struct distribute_txn_replay_req *dtrq,
993                                 struct thandle_exec_args *ta)
994 {
995         struct llog_update_record *lur = dtrq->dtrq_lur;
996         struct update_records   *records = &lur->lur_update_rec;
997         struct update_ops       *ops = &records->ur_ops;
998         struct update_params    *params = update_records_get_params(records);
999         struct top_thandle      *top_th = container_of(ta->ta_handle,
1000                                                        struct top_thandle,
1001                                                        tt_super);
1002         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
1003         struct update_op        *op;
1004         unsigned int            i;
1005         int                     rc = 0;
1006         ENTRY;
1007
1008         /* These records have been swabbed in llog_cat_process() */
1009         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
1010              i++, op = update_op_next_op(op)) {
1011                 struct lu_fid           *fid = &op->uop_fid;
1012                 struct dt_object        *dt_obj;
1013                 struct dt_object        *sub_dt_obj;
1014                 struct dt_device        *sub_dt;
1015                 struct sub_thandle      *st;
1016
1017                 if (op->uop_type == OUT_NOOP)
1018                         continue;
1019
1020                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
1021                 if (IS_ERR(dt_obj)) {
1022                         rc = PTR_ERR(dt_obj);
1023                         break;
1024                 }
1025                 sub_dt_obj = dt_object_child(dt_obj);
1026
1027                 /* Create sub thandle if not */
1028                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
1029                 st = lookup_sub_thandle(tmt, sub_dt);
1030                 if (st == NULL) {
1031                         st = create_sub_thandle(tmt, sub_dt);
1032                         if (IS_ERR(st))
1033                                 GOTO(next, rc = PTR_ERR(st));
1034                 }
1035
1036                 /* check if updates on the OSD/OSP are committed */
1037                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
1038                 if (rc == 0)
1039                         /* If this is committed, goto next */
1040                         goto next;
1041
1042                 if (rc < 0)
1043                         GOTO(next, rc);
1044
1045                 /* Create thandle for sub thandle if needed */
1046                 if (st->st_sub_th == NULL) {
1047                         rc = sub_thandle_trans_create(env, top_th, st);
1048                         if (rc != 0)
1049                                 GOTO(next, rc);
1050                 }
1051
1052                 CDEBUG(D_HA, "replay %uth update\n", i);
1053                 switch (op->uop_type) {
1054                 case OUT_CREATE:
1055                         rc = update_recovery_create(env, sub_dt_obj,
1056                                                     op, params, ta,
1057                                                     st->st_sub_th);
1058                         break;
1059                 case OUT_DESTROY:
1060                         rc = update_recovery_destroy(env, sub_dt_obj,
1061                                                      op, params, ta,
1062                                                      st->st_sub_th);
1063                         break;
1064                 case OUT_REF_ADD:
1065                         rc = update_recovery_ref_add(env, sub_dt_obj,
1066                                                      op, params, ta,
1067                                                      st->st_sub_th);
1068                         break;
1069                 case OUT_REF_DEL:
1070                         rc = update_recovery_ref_del(env, sub_dt_obj,
1071                                                      op, params, ta,
1072                                                      st->st_sub_th);
1073                         break;
1074                 case OUT_ATTR_SET:
1075                         rc = update_recovery_attr_set(env, sub_dt_obj,
1076                                                       op, params, ta,
1077                                                       st->st_sub_th);
1078                         break;
1079                 case OUT_XATTR_SET:
1080                         rc = update_recovery_xattr_set(env, sub_dt_obj,
1081                                                        op, params, ta,
1082                                                        st->st_sub_th);
1083                         break;
1084                 case OUT_INDEX_INSERT:
1085                         rc = update_recovery_index_insert(env, sub_dt_obj,
1086                                                           op, params, ta,
1087                                                           st->st_sub_th);
1088                         break;
1089                 case OUT_INDEX_DELETE:
1090                         rc = update_recovery_index_delete(env, sub_dt_obj,
1091                                                           op, params, ta,
1092                                                           st->st_sub_th);
1093                         break;
1094                 case OUT_WRITE:
1095                         rc = update_recovery_write(env, sub_dt_obj,
1096                                                    op, params, ta,
1097                                                    st->st_sub_th);
1098                         break;
1099                 case OUT_XATTR_DEL:
1100                         rc = update_recovery_xattr_del(env, sub_dt_obj,
1101                                                        op, params, ta,
1102                                                        st->st_sub_th);
1103                         break;
1104                 default:
1105                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
1106                         rc = -EINVAL;
1107                         break;
1108                 }
1109 next:
1110                 lu_object_put(env, &dt_obj->do_lu);
1111                 if (rc < 0)
1112                         break;
1113         }
1114
1115         ta->ta_handle->th_result = rc;
1116         RETURN(rc);
1117 }
1118
1119 /**
1120  * redo updates on MDT if needed.
1121  *
1122  * During DNE recovery, the recovery thread (target_recovery_thread) will call
1123  * this function to replay distribute txn updates on all MDTs. It only replay
1124  * updates on the MDT where the update record is missing.
1125  *
1126  * If the update already exists on the MDT, then it does not need replay the
1127  * updates on that MDT, and only mark the sub transaction has been committed
1128  * there.
1129  *
1130  * \param[in] env       execution environment
1131  * \param[in] tdtd      target distribute txn data, which holds the replay list
1132  *                      and all parameters needed by replay process.
1133  * \param[in] dtrq      distribute txn replay req.
1134  *
1135  * \retval              0 if replay succeeds.
1136  * \retval              negative errno if replay failes.
1137  */
1138 int distribute_txn_replay_handle(struct lu_env *env,
1139                                  struct target_distribute_txn_data *tdtd,
1140                                  struct distribute_txn_replay_req *dtrq)
1141 {
1142         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1143         struct thandle_exec_args *ta;
1144         struct lu_context       session_env;
1145         struct thandle          *th = NULL;
1146         struct top_thandle      *top_th;
1147         struct top_multiple_thandle *tmt;
1148         struct thandle_update_records *tur = NULL;
1149         int                     i;
1150         int                     rc = 0;
1151         ENTRY;
1152
1153         /* initialize session, it is needed for the handler of target */
1154         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1155         if (rc) {
1156                 CERROR("%s: failure to initialize session: rc = %d\n",
1157                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1158                 RETURN(rc);
1159         }
1160         lu_context_enter(&session_env);
1161         env->le_ses = &session_env;
1162         lu_env_refill(env);
1163         update_records_dump(records, D_HA, true);
1164         th = top_trans_create(env, NULL);
1165         if (IS_ERR(th))
1166                 GOTO(exit_session, rc = PTR_ERR(th));
1167
1168         ta = &update_env_info(env)->uti_tea;
1169         ta->ta_argno = 0;
1170
1171         update_env_info(env)->uti_dtrq = dtrq;
1172         /* Create distribute transaction structure for this top thandle */
1173         top_th = container_of(th, struct top_thandle, tt_super);
1174         rc = top_trans_create_tmt(env, top_th);
1175         if (rc < 0)
1176                 GOTO(stop_trans, rc);
1177
1178         ta->ta_handle = th;
1179
1180         /* check if the distribute transaction has been committed */
1181         tmt = top_th->tt_multiple_thandle;
1182         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1183         tmt->tmt_batchid = records->ur_batchid;
1184         tgt_th_info(env)->tti_transno = records->ur_master_transno;
1185
1186         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1187                 tmt->tmt_committed = 1;
1188
1189         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1190         if (rc < 0)
1191                 GOTO(stop_trans, rc);
1192
1193         /* If no updates are needed to be replayed, then
1194          * mark this records as committed, so commit thread
1195          * distribute_txn_commit_thread() will delete the
1196          * record */
1197         if (ta->ta_argno == 0)
1198                 tmt->tmt_committed = 1;
1199
1200         tur = &update_env_info(env)->uti_tur;
1201         tur->tur_update_records = dtrq->dtrq_lur;
1202         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1203         tur->tur_update_params = NULL;
1204         tur->tur_update_param_count = 0;
1205         tmt->tmt_update_records = tur;
1206
1207         distribute_txn_insert_by_batchid(tmt);
1208         rc = top_trans_start(env, NULL, th);
1209         if (rc < 0)
1210                 GOTO(stop_trans, rc);
1211
1212         for (i = 0; i < ta->ta_argno; i++) {
1213                 struct tx_arg           *ta_arg;
1214                 struct dt_object        *dt_obj;
1215                 struct dt_device        *sub_dt;
1216                 struct sub_thandle      *st;
1217
1218                 ta_arg = ta->ta_args[i];
1219                 dt_obj = ta_arg->object;
1220
1221                 LASSERT(tmt->tmt_committed == 0);
1222                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1223                 st = lookup_sub_thandle(tmt, sub_dt);
1224                 LASSERT(st != NULL);
1225                 LASSERT(st->st_sub_th != NULL);
1226                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1227                                              ta->ta_args[i]);
1228                 if (unlikely(rc < 0)) {
1229                         CDEBUG(D_HA, "error during execution of #%u from"
1230                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1231                                ta->ta_args[i]->line, rc);
1232                         while (--i > 0) {
1233                                 if (ta->ta_args[i]->undo_fn != NULL) {
1234                                         dt_obj = ta->ta_args[i]->object;
1235                                         sub_dt =
1236                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1237                                         st = lookup_sub_thandle(tmt, sub_dt);
1238                                         LASSERT(st != NULL);
1239                                         LASSERT(st->st_sub_th != NULL);
1240
1241                                         ta->ta_args[i]->undo_fn(env,
1242                                                                st->st_sub_th,
1243                                                                ta->ta_args[i]);
1244                                 } else {
1245                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1246                                              dt_obd_name(ta->ta_handle->th_dev),
1247                                                ta->ta_args[i]->file,
1248                                                ta->ta_args[i]->line, -ENOTSUPP);
1249                                 }
1250                         }
1251                         break;
1252                 }
1253                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1254                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1255         }
1256
1257 stop_trans:
1258         if (rc < 0)
1259                 th->th_result = rc;
1260         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1261         for (i = 0; i < ta->ta_argno; i++) {
1262                 if (ta->ta_args[i]->object != NULL) {
1263                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1264                         ta->ta_args[i]->object = NULL;
1265                 }
1266         }
1267
1268         if (tur != NULL)
1269                 tur->tur_update_records = NULL;
1270 exit_session:
1271         lu_context_exit(&session_env);
1272         lu_context_fini(&session_env);
1273         RETURN(rc);
1274 }
1275 EXPORT_SYMBOL(distribute_txn_replay_handle);