Whamcloud - gitweb
LU-3540 lod: update recovery thread
[fs/lustre-release.git] / lustre / target / update_recovery.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_recovery.c
28  *
29  * This file implement the methods to handle the update recovery.
30  *
31  * During DNE recovery, the recovery thread will redo the operation according
32  * to the transaction no, and these replay are either from client replay req
33  * or update replay records(for distribute transaction) in the update log.
34  * For distribute transaction replay, the replay thread will call
35  * distribute_txn_replay_handle() to handle the updates.
36  *
37  * After the Master MDT restarts, it will retrieve the update records from all
38  * of MDTs, for each distributed operation, it will check updates on all MDTs,
39  * if some updates records are missing on some MDTs, the replay thread will redo
40  * updates on these MDTs.
41  *
42  * Author: Di Wang <di.wang@intel.com>
43  */
44 #define DEBUG_SUBSYSTEM S_CLASS
45
46 #include <lu_target.h>
47 #include <md_object.h>
48 #include <lustre_update.h>
49 #include <obd.h>
50 #include <obd_class.h>
51 #include "tgt_internal.h"
52
53 /**
54  * Lookup distribute_txn_replay req
55  *
56  * Lookup distribute_txn_replay in the replay list by batchid.
57  * It is assumed the list has been locked before calling this function.
58  *
59  * \param[in] tdtd      distribute_txn_data, which holds the replay
60  *                      list.
61  * \param[in] batchid   batchid used by lookup.
62  *
63  * \retval              pointer of the replay if succeeds.
64  * \retval              NULL if can not find it.
65  */
66 static struct distribute_txn_replay_req *
67 dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid)
68 {
69         struct distribute_txn_replay_req        *tmp;
70         struct distribute_txn_replay_req        *dtrq = NULL;
71
72         list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
73                 if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
74                         dtrq = tmp;
75                         break;
76                 }
77         }
78         return dtrq;
79 }
80
81 /**
82  * insert distribute txn replay req
83  *
84  * Insert distribute txn replay to the replay list, and it assumes the
85  * list has been looked. Note: the replay list is a sorted list, which
86  * is sorted by master transno. It is assumed the replay list has been
87  * locked before calling this function.
88  *
89  * \param[in] tdtd      target distribute txn data where replay list is
90  * \param[in] new       distribute txn replay to be inserted
91  *
92  * \retval              0 if insertion succeeds
93  * \retval              EEXIST if the dtrq already exists
94  */
95 static int dtrq_insert(struct target_distribute_txn_data *tdtd,
96                         struct distribute_txn_replay_req *new)
97 {
98         struct distribute_txn_replay_req *iter;
99
100         list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
101                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
102                     new->dtrq_lur->lur_update_rec.ur_master_transno)
103                         continue;
104
105                 /* If there are mulitple replay req with same transno, then
106                  * sort them with batchid */
107                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
108                     new->dtrq_lur->lur_update_rec.ur_master_transno &&
109                     iter->dtrq_lur->lur_update_rec.ur_batchid ==
110                     new->dtrq_lur->lur_update_rec.ur_batchid)
111                         return -EEXIST;
112
113                 /* If there are mulitple replay req with same transno, then
114                  * sort them with batchid */
115                 if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
116                     new->dtrq_lur->lur_update_rec.ur_master_transno &&
117                     iter->dtrq_lur->lur_update_rec.ur_batchid >
118                     new->dtrq_lur->lur_update_rec.ur_batchid)
119                         continue;
120
121                 list_add(&new->dtrq_list, &iter->dtrq_list);
122                 break;
123         }
124
125         if (list_empty(&new->dtrq_list))
126                 list_add(&new->dtrq_list, &tdtd->tdtd_replay_list);
127
128         return 0;
129 }
130
131 /**
132  * create distribute txn replay req
133  *
134  * Allocate distribute txn replay req according to the update records.
135  *
136  * \param[in] tdtd      target distribute txn data where replay list is.
137  * \param[in] record    update records from the update log.
138  *
139  * \retval              the pointer of distribute txn replay req if
140  *                      the creation succeeds.
141  * \retval              NULL if the creation fails.
142  */
143 static struct distribute_txn_replay_req *
144 dtrq_create(struct llog_update_record *lur)
145 {
146         struct distribute_txn_replay_req *new;
147
148         OBD_ALLOC_PTR(new);
149         if (new == NULL)
150                 RETURN(ERR_PTR(-ENOMEM));
151
152         new->dtrq_lur_size = llog_update_record_size(lur);
153         OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size);
154         if (new->dtrq_lur == NULL) {
155                 OBD_FREE_PTR(new);
156                 RETURN(ERR_PTR(-ENOMEM));
157         }
158
159         memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
160
161         spin_lock_init(&new->dtrq_sub_list_lock);
162         INIT_LIST_HEAD(&new->dtrq_sub_list);
163         INIT_LIST_HEAD(&new->dtrq_list);
164
165         RETURN(new);
166 }
167
168 /**
169  * Lookup distribute sub replay
170  *
171  * Lookup distribute sub replay in the sub list of distribute_txn_replay by
172  * mdt_index.
173  *
174  * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup
175  * \param[in] mdt_index                 the mdt_index as the key of lookup
176  *
177  * \retval              the pointer of sub replay if it can be found.
178  * \retval              NULL if it can not find.
179  */
180 struct distribute_txn_replay_req_sub *
181 dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
182 {
183         struct distribute_txn_replay_req_sub *dtrqs = NULL;
184         struct distribute_txn_replay_req_sub *tmp;
185
186         list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
187                 if (tmp->dtrqs_mdt_index == mdt_index) {
188                         dtrqs = tmp;
189                         break;
190                 }
191         }
192         return dtrqs;
193 }
194
195 /**
196  * Insert distribute txn sub req replay
197  *
198  * Allocate sub replay req and insert distribute txn replay list.
199  *
200  * \param[in] dtrq      d to be added
201  * \param[in] cookie    the cookie of the update record
202  * \param[in] mdt_index the mdt_index of the update record
203  *
204  * \retval              0 if the adding succeeds.
205  * \retval              negative errno if the adding fails.
206  */
207 static int
208 dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
209                            struct llog_cookie *cookie,
210                            __u32 mdt_index)
211 {
212         struct distribute_txn_replay_req_sub *dtrqs = NULL;
213         struct distribute_txn_replay_req_sub *new;
214         ENTRY;
215
216         spin_lock(&dtrq->dtrq_sub_list_lock);
217         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
218         spin_unlock(&dtrq->dtrq_sub_list_lock);
219         if (dtrqs != NULL)
220                 RETURN(0);
221
222         OBD_ALLOC_PTR(new);
223         if (new == NULL)
224                 RETURN(-ENOMEM);
225
226         INIT_LIST_HEAD(&new->dtrqs_list);
227         new->dtrqs_mdt_index = mdt_index;
228         new->dtrqs_llog_cookie = *cookie;
229         spin_lock(&dtrq->dtrq_sub_list_lock);
230         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
231         if (dtrqs == NULL)
232                 list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
233         else
234                 OBD_FREE_PTR(new);
235         spin_unlock(&dtrq->dtrq_sub_list_lock);
236
237         RETURN(0);
238 }
239
240 /**
241  * Insert update records to the replay list.
242  *
243  * Allocate distribute txn replay req and insert it into the replay
244  * list, then insert the update records into the replay req.
245  *
246  * \param[in] tdtd      distribute txn replay data where the replay list
247  *                      is.
248  * \param[in] record    the update record
249  * \param[in] cookie    cookie of the record
250  * \param[in] index     mdt index of the record
251  *
252  * \retval              0 if the adding succeeds.
253  * \retval              negative errno if the adding fails.
254  */
255 int
256 insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
257                                      struct llog_update_record *lur,
258                                      struct llog_cookie *cookie,
259                                      __u32 mdt_index)
260 {
261         struct distribute_txn_replay_req *dtrq;
262         struct update_records            *record = &lur->lur_update_rec;
263         int rc = 0;
264         ENTRY;
265
266         CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
267                " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
268                record->ur_batchid, record->ur_master_transno, mdt_index);
269 again:
270         spin_lock(&tdtd->tdtd_replay_list_lock);
271         dtrq = dtrq_lookup(tdtd, record->ur_batchid);
272         spin_unlock(&tdtd->tdtd_replay_list_lock);
273         if (dtrq == NULL) {
274                 /* If the transno in the update record is 0, it means the
275                  * update are from master MDT, and we will use the master
276                  * last committed transno as its batchid. Note: if it got
277                  * the records from the slave later, it needs to update
278                  * the batchid by the transno in slave update log (see below) */
279                 dtrq = dtrq_create(lur);
280                 if (IS_ERR(dtrq))
281                         RETURN(PTR_ERR(dtrq));
282
283                 if (record->ur_master_transno == 0)
284                         dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
285                                 tdtd->tdtd_lut->lut_last_transno;
286                 spin_lock(&tdtd->tdtd_replay_list_lock);
287                 rc = dtrq_insert(tdtd, dtrq);
288                 spin_unlock(&tdtd->tdtd_replay_list_lock);
289         } else if (record->ur_master_transno != 0 &&
290                    dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
291                    record->ur_master_transno) {
292                 /* If the master transno in update header is not matched with
293                  * the one in the record, then it means the dtrq is originally
294                  * created by master record, and we need update master transno
295                  * and reposition the dtrq(by master transno). */
296                 dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
297                                                 record->ur_master_transno;
298                 list_del_init(&dtrq->dtrq_list);
299                 spin_lock(&tdtd->tdtd_replay_list_lock);
300                 rc = dtrq_insert(tdtd, dtrq);
301                 spin_unlock(&tdtd->tdtd_replay_list_lock);
302         }
303
304         if (rc == -EEXIST) {
305                 dtrq_destory(dtrq);
306                 rc = 0;
307                 goto again;
308         }
309
310         rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
311
312         RETURN(rc);
313 }
314 EXPORT_SYMBOL(insert_update_records_to_replay_list);
315
316 /**
317  * Dump updates of distribute txns.
318  *
319  * Output all of recovery updates in the distribute txn list to the
320  * debug log.
321  *
322  * \param[in] tdtd      distribute txn data where all of distribute txn
323  *                      are listed.
324  * \param[in] mask      debug mask
325  */
326 void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask)
327 {
328         struct distribute_txn_replay_req *dtrq;
329
330         spin_lock(&tdtd->tdtd_replay_list_lock);
331         list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list)
332                 update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask,
333                                     false);
334         spin_unlock(&tdtd->tdtd_replay_list_lock);
335 }
336 EXPORT_SYMBOL(dtrq_list_dump);
337
338 /**
339  * Destroy distribute txn replay req
340  *
341  * Destroy distribute txn replay req and all of subs.
342  *
343  * \param[in] dtrq      distribute txn replqy req to be destroyed.
344  */
345 void dtrq_destory(struct distribute_txn_replay_req *dtrq)
346 {
347         struct distribute_txn_replay_req_sub    *dtrqs;
348         struct distribute_txn_replay_req_sub    *tmp;
349
350         LASSERT(list_empty(&dtrq->dtrq_list));
351         spin_lock(&dtrq->dtrq_sub_list_lock);
352         list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
353                 list_del(&dtrqs->dtrqs_list);
354                 OBD_FREE_PTR(dtrqs);
355         }
356         spin_unlock(&dtrq->dtrq_sub_list_lock);
357
358         if (dtrq->dtrq_lur != NULL)
359                 OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
360
361         OBD_FREE_PTR(dtrq);
362 }
363 EXPORT_SYMBOL(dtrq_destory);
364
365 /**
366  * Destroy all of replay req.
367  *
368  * Destroy all of replay req in the replay list.
369  *
370  * \param[in] tdtd      target distribute txn data where the replay list is.
371  */
372 void dtrq_list_destroy(struct target_distribute_txn_data *tdtd)
373 {
374         struct distribute_txn_replay_req *dtrq;
375         struct distribute_txn_replay_req *tmp;
376
377         spin_lock(&tdtd->tdtd_replay_list_lock);
378         list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
379                                  dtrq_list) {
380                 list_del_init(&dtrq->dtrq_list);
381                 dtrq_destory(dtrq);
382         }
383         spin_unlock(&tdtd->tdtd_replay_list_lock);
384 }
385 EXPORT_SYMBOL(dtrq_list_destroy);
386
387 /**
388  * Get next req in the replay list
389  *
390  * Get next req needs to be replayed, since it is a sorted list
391  * (by master MDT transno)
392  *
393  * \param[in] tdtd      distribute txn data where the replay list is
394  *
395  * \retval              the pointer of update recovery header
396  */
397 struct distribute_txn_replay_req *
398 distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd)
399 {
400         struct distribute_txn_replay_req *dtrq = NULL;
401
402         spin_lock(&tdtd->tdtd_replay_list_lock);
403         if (!list_empty(&tdtd->tdtd_replay_list)) {
404                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
405                                  struct distribute_txn_replay_req, dtrq_list);
406                 list_del_init(&dtrq->dtrq_list);
407         }
408         spin_unlock(&tdtd->tdtd_replay_list_lock);
409
410         return dtrq;
411 }
412 EXPORT_SYMBOL(distribute_txn_get_next_req);
413
414 /**
415  * Get next transno in the replay list, because this is the sorted
416  * list, so it will return the transno of next req in the list.
417  *
418  * \param[in] tdtd      distribute txn data where the replay list is
419  *
420  * \retval              the transno of next update in the list
421  */
422 __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd)
423 {
424         struct distribute_txn_replay_req        *dtrq = NULL;
425         __u64                                   transno = 0;
426
427         spin_lock(&tdtd->tdtd_replay_list_lock);
428         if (!list_empty(&tdtd->tdtd_replay_list)) {
429                 dtrq = list_entry(tdtd->tdtd_replay_list.next,
430                                  struct distribute_txn_replay_req, dtrq_list);
431                 transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
432         }
433         spin_unlock(&tdtd->tdtd_replay_list_lock);
434
435         CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
436                tdtd->tdtd_lut->lut_obd->obd_name, transno);
437         return transno;
438 }
439 EXPORT_SYMBOL(distribute_txn_get_next_transno);
440
441 /**
442  * Check if the update of one object is committed
443  *
444  * Check whether the update for the object is committed by checking whether
445  * the correspondent sub exists in the replay req. If it is committed, mark
446  * the committed flag in correspondent the sub thandle.
447  *
448  * \param[in] env       execution environment
449  * \param[in] dtrq      replay request
450  * \param[in] dt_obj    object for the update
451  * \param[in] top_th    top thandle
452  * \param[in] sub_th    sub thandle which the update belongs to
453  *
454  * \retval              1 if the update is not committed.
455  * \retval              0 if the update is committed.
456  * \retval              negative errno if some other failures happen.
457  */
458 static int update_is_committed(const struct lu_env *env,
459                                struct distribute_txn_replay_req *dtrq,
460                                struct dt_object *dt_obj,
461                                struct top_thandle *top_th,
462                                struct sub_thandle *st)
463 {
464         struct seq_server_site  *seq_site;
465         const struct lu_fid     *fid = lu_object_fid(&dt_obj->do_lu);
466         struct distribute_txn_replay_req_sub    *dtrqs;
467         __u32                   mdt_index;
468         ENTRY;
469
470         if (st->st_sub_th != NULL)
471                 RETURN(1);
472
473         if (st->st_committed)
474                 RETURN(0);
475
476         seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site);
477         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
478                 mdt_index = fid_oid(fid);
479         } else if (!fid_seq_in_fldb(fid_seq(fid))) {
480                 mdt_index = seq_site->ss_node_id;
481         } else {
482                 struct lu_server_fld *fld;
483                 struct lu_seq_range range = {0};
484                 int rc;
485
486                 fld = seq_site->ss_server_fld;
487                 fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
488                 LASSERT(fld->lsf_seq_lookup != NULL);
489                 rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid),
490                                          &range);
491                 if (rc < 0)
492                         RETURN(rc);
493                 mdt_index = range.lsr_index;
494         }
495
496         dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
497         if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
498                 st->st_committed = 1;
499                 if (dtrqs != NULL)
500                         st->st_cookie = dtrqs->dtrqs_llog_cookie;
501                 RETURN(0);
502         }
503
504         CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid),
505                mdt_index);
506
507         RETURN(1);
508 }
509
510 /**
511  * Implementation of different update methods for update recovery.
512  *
513  * These following functions update_recovery_$(update_name) implement
514  * different updates recovery methods. They will extract the parameters
515  * from the common parameters area and call correspondent dt API to redo
516  * the update.
517  *
518  * \param[in] env       execution environment
519  * \param[in] op        update operation to be replayed
520  * \param[in] params    common update parameters which holds all parameters
521  *                      of the operation
522  * \param[in] th        transaction handle
523  * \param[in] declare   indicate it will do declare or real execution, true
524  *                      means declare, false means real execution
525  *
526  * \retval              0 if it succeeds.
527  * \retval              negative errno if it fails.
528  */
529 static int update_recovery_create(const struct lu_env *env,
530                                   struct dt_object *dt_obj,
531                                   const struct update_op *op,
532                                   const struct update_params *params,
533                                   struct thandle_exec_args *ta,
534                                   struct thandle *th)
535 {
536         struct update_thread_info *uti = update_env_info(env);
537         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
538         struct lu_attr          *attr = &uti->uti_attr;
539         struct obdo             *wobdo;
540         struct obdo             *lobdo = &uti->uti_obdo;
541         struct dt_object_format dof;
542         __u16                   size;
543         unsigned int            param_count;
544         int rc;
545         ENTRY;
546
547         if (dt_object_exists(dt_obj))
548                 RETURN(-EEXIST);
549
550         param_count = lur->lur_update_rec.ur_param_count;
551         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
552                                             param_count, &size);
553         if (wobdo == NULL)
554                 RETURN(-EIO);
555         if (size != sizeof(*wobdo))
556                 RETURN(-EIO);
557
558         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
559                 lustre_swab_obdo(wobdo);
560
561         lustre_get_wire_obdo(NULL, lobdo, wobdo);
562         la_from_obdo(attr, lobdo, lobdo->o_valid);
563
564         dof.dof_type = dt_mode_to_dft(attr->la_mode);
565
566         rc = out_tx_create(env, dt_obj, attr, NULL, &dof,
567                            ta, th, NULL, 0);
568
569         RETURN(rc);
570 }
571
572 static int update_recovery_destroy(const struct lu_env *env,
573                                    struct dt_object *dt_obj,
574                                    const struct update_op *op,
575                                    const struct update_params *params,
576                                    struct thandle_exec_args *ta,
577                                    struct thandle *th)
578 {
579         int rc;
580         ENTRY;
581
582         rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0);
583
584         RETURN(rc);
585 }
586
587 static int update_recovery_ref_add(const struct lu_env *env,
588                                    struct dt_object *dt_obj,
589                                    const struct update_op *op,
590                                    const struct update_params *params,
591                                    struct thandle_exec_args *ta,
592                                    struct thandle *th)
593 {
594         int rc;
595         ENTRY;
596
597         rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0);
598
599         RETURN(rc);
600 }
601
602 static int update_recovery_ref_del(const struct lu_env *env,
603                                    struct dt_object *dt_obj,
604                                    const struct update_op *op,
605                                    const struct update_params *params,
606                                    struct thandle_exec_args *ta,
607                                    struct thandle *th)
608 {
609         int rc;
610         ENTRY;
611
612         rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0);
613
614         RETURN(rc);
615 }
616
617 static int update_recovery_attr_set(const struct lu_env *env,
618                                     struct dt_object *dt_obj,
619                                     const struct update_op *op,
620                                     const struct update_params *params,
621                                     struct thandle_exec_args *ta,
622                                     struct thandle *th)
623 {
624         struct update_thread_info *uti = update_env_info(env);
625         struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur;
626         struct obdo     *wobdo;
627         struct obdo     *lobdo = &uti->uti_obdo;
628         struct lu_attr  *attr = &uti->uti_attr;
629         __u16           size;
630         unsigned int    param_count;
631         int             rc;
632         ENTRY;
633
634         param_count = lur->lur_update_rec.ur_param_count;
635         wobdo = update_params_get_param_buf(params, op->uop_params_off[0],
636                                             param_count, &size);
637         if (wobdo == NULL)
638                 RETURN(-EIO);
639         if (size != sizeof(*wobdo))
640                 RETURN(-EIO);
641
642         if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr))
643                 lustre_swab_obdo(wobdo);
644
645         lustre_get_wire_obdo(NULL, lobdo, wobdo);
646         la_from_obdo(attr, lobdo, lobdo->o_valid);
647
648         rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0);
649
650         RETURN(rc);
651 }
652
653 static int update_recovery_xattr_set(const struct lu_env *env,
654                                      struct dt_object *dt_obj,
655                                      const struct update_op *op,
656                                      const struct update_params *params,
657                                      struct thandle_exec_args *ta,
658                                      struct thandle *th)
659 {
660         struct update_thread_info *uti = update_env_info(env);
661         char            *buf;
662         char            *name;
663         int             fl;
664         __u16           size;
665         __u32           param_count;
666         int             rc;
667         ENTRY;
668
669         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
670         name = update_params_get_param_buf(params,
671                                            op->uop_params_off[0],
672                                            param_count, &size);
673         if (name == NULL)
674                 RETURN(-EIO);
675
676         buf = update_params_get_param_buf(params,
677                                           op->uop_params_off[1],
678                                           param_count, &size);
679         if (buf == NULL)
680                 RETURN(-EIO);
681
682         uti->uti_buf.lb_buf = buf;
683         uti->uti_buf.lb_len = (size_t)size;
684
685         buf = update_params_get_param_buf(params, op->uop_params_off[2],
686                                           param_count, &size);
687         if (buf == NULL)
688                 RETURN(-EIO);
689         if (size != sizeof(fl))
690                 RETURN(-EIO);
691
692         fl = le32_to_cpu(*(int *)buf);
693
694         rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th,
695                               NULL, 0);
696
697         RETURN(rc);
698 }
699
700 static int update_recovery_index_insert(const struct lu_env *env,
701                                         struct dt_object *dt_obj,
702                                         const struct update_op *op,
703                                         const struct update_params *params,
704                                         struct thandle_exec_args *ta,
705                                         struct thandle *th)
706 {
707         struct update_thread_info *uti = update_env_info(env);
708         struct lu_fid           *fid;
709         char                    *name;
710         __u32                   param_count;
711         __u32                   *ptype;
712         __u32                   type;
713         __u16                   size;
714         int rc;
715         ENTRY;
716
717         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
718         name = update_params_get_param_buf(params, op->uop_params_off[0],
719                                            param_count, &size);
720         if (name == NULL)
721                 RETURN(-EIO);
722
723         fid = update_params_get_param_buf(params, op->uop_params_off[1],
724                                           param_count, &size);
725         if (fid == NULL)
726                 RETURN(-EIO);
727         if (size != sizeof(*fid))
728                 RETURN(-EIO);
729
730         fid_le_to_cpu(fid, fid);
731
732         ptype = update_params_get_param_buf(params, op->uop_params_off[2],
733                                             param_count, &size);
734         if (ptype == NULL)
735                 RETURN(-EIO);
736         if (size != sizeof(*ptype))
737                 RETURN(-EIO);
738         type = le32_to_cpu(*ptype);
739
740         if (dt_try_as_dir(env, dt_obj) == 0)
741                 RETURN(-ENOTDIR);
742
743         uti->uti_rec.rec_fid = fid;
744         uti->uti_rec.rec_type = type;
745
746         rc = out_tx_index_insert(env, dt_obj,
747                                  (const struct dt_rec *)&uti->uti_rec,
748                                  (const struct dt_key *)name, ta, th,
749                                  NULL, 0);
750
751         RETURN(rc);
752 }
753
754 static int update_recovery_index_delete(const struct lu_env *env,
755                                         struct dt_object *dt_obj,
756                                         const struct update_op *op,
757                                         const struct update_params *params,
758                                         struct thandle_exec_args *ta,
759                                         struct thandle *th)
760 {
761         struct update_thread_info *uti = update_env_info(env);
762         __u32   param_count;
763         char    *name;
764         __u16   size;
765         int     rc;
766         ENTRY;
767
768         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
769         name = update_params_get_param_buf(params, op->uop_params_off[0],
770                                            param_count, &size);
771         if (name == NULL)
772                 RETURN(-EIO);
773
774         if (dt_try_as_dir(env, dt_obj) == 0)
775                 RETURN(-ENOTDIR);
776
777         rc = out_tx_index_delete(env, dt_obj,
778                                  (const struct dt_key *)name, ta, th, NULL, 0);
779
780         RETURN(rc);
781 }
782
783 static int update_recovery_write(const struct lu_env *env,
784                                  struct dt_object *dt_obj,
785                                  const struct update_op *op,
786                                  const struct update_params *params,
787                                  struct thandle_exec_args *ta,
788                                  struct thandle *th)
789 {
790         struct update_thread_info *uti = update_env_info(env);
791         char            *buf;
792         __u32           param_count;
793         __u64           pos;
794         __u16           size;
795         int rc;
796         ENTRY;
797
798         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
799         buf = update_params_get_param_buf(params, op->uop_params_off[0],
800                                           param_count, &size);
801         if (buf == NULL)
802                 RETURN(-EIO);
803
804         uti->uti_buf.lb_buf = buf;
805         uti->uti_buf.lb_len = size;
806
807         buf = update_params_get_param_buf(params, op->uop_params_off[1],
808                                           param_count, &size);
809         if (buf == NULL)
810                 RETURN(-EIO);
811
812         pos = le64_to_cpu(*(__u64 *)buf);
813
814         rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos,
815                           ta, th, NULL, 0);
816
817         RETURN(rc);
818 }
819
820 static int update_recovery_xattr_del(const struct lu_env *env,
821                                      struct dt_object *dt_obj,
822                                      const struct update_op *op,
823                                      const struct update_params *params,
824                                      struct thandle_exec_args *ta,
825                                      struct thandle *th)
826 {
827         struct update_thread_info *uti = update_env_info(env);
828         __u32   param_count;
829         char    *name;
830         __u16   size;
831         int     rc;
832         ENTRY;
833
834         param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count;
835         name = update_params_get_param_buf(params, op->uop_params_off[0],
836                                            param_count, &size);
837         if (name == NULL)
838                 RETURN(-EIO);
839
840         rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0);
841
842         RETURN(rc);
843 }
844
845 /**
846  * Execute updates in the update replay records
847  *
848  * Declare distribute txn replay by update records and add the updates
849  * to the execution list. Note: it will check if the update has been
850  * committed, and only execute the updates if it is not committed to
851  * disk.
852  *
853  * \param[in] env       execution environment
854  * \param[in] tdtd      distribute txn replay data which hold all of replay
855  *                      reqs and all replay parameters.
856  * \param[in] dtrq      distribute transaction replay req.
857  * \param[in] ta        thandle execute args.
858  *
859  * \retval              0 if declare succeeds.
860  * \retval              negative errno if declare fails.
861  */
862 static int update_recovery_exec(const struct lu_env *env,
863                                 struct target_distribute_txn_data *tdtd,
864                                 struct distribute_txn_replay_req *dtrq,
865                                 struct thandle_exec_args *ta)
866 {
867         struct llog_update_record *lur = dtrq->dtrq_lur;
868         struct update_records   *records = &lur->lur_update_rec;
869         struct update_ops       *ops = &records->ur_ops;
870         struct update_params    *params = update_records_get_params(records);
871         struct top_thandle      *top_th = container_of(ta->ta_handle,
872                                                        struct top_thandle,
873                                                        tt_super);
874         struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle;
875         struct update_op        *op;
876         unsigned int            i;
877         int                     rc = 0;
878         ENTRY;
879
880         /* These records have been swabbed in llog_cat_process() */
881         for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count;
882              i++, op = update_op_next_op(op)) {
883                 struct lu_fid           *fid = &op->uop_fid;
884                 struct dt_object        *dt_obj;
885                 struct dt_object        *sub_dt_obj;
886                 struct dt_device        *sub_dt;
887                 struct sub_thandle      *st;
888
889                 dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
890                 if (IS_ERR(dt_obj)) {
891                         rc = PTR_ERR(dt_obj);
892                         break;
893                 }
894                 sub_dt_obj = dt_object_child(dt_obj);
895
896                 /* Create sub thandle if not */
897                 sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev);
898                 st = lookup_sub_thandle(tmt, sub_dt);
899                 if (st == NULL) {
900                         st = create_sub_thandle(tmt, sub_dt);
901                         if (IS_ERR(st))
902                                 GOTO(next, rc = PTR_ERR(st));
903                 }
904
905                 /* check if updates on the OSD/OSP are committed */
906                 rc = update_is_committed(env, dtrq, dt_obj, top_th, st);
907                 if (rc == 0)
908                         /* If this is committed, goto next */
909                         goto next;
910
911                 if (rc < 0)
912                         GOTO(next, rc);
913
914                 /* Create thandle for sub thandle if needed */
915                 if (st->st_sub_th == NULL) {
916                         rc = sub_thandle_trans_create(env, top_th, st);
917                         if (rc != 0)
918                                 GOTO(next, rc);
919                 }
920
921                 CDEBUG(D_HA, "replay %uth update\n", i);
922                 switch (op->uop_type) {
923                 case OUT_CREATE:
924                         rc = update_recovery_create(env, sub_dt_obj,
925                                                     op, params, ta,
926                                                     st->st_sub_th);
927                         break;
928                 case OUT_DESTROY:
929                         rc = update_recovery_destroy(env, sub_dt_obj,
930                                                      op, params, ta,
931                                                      st->st_sub_th);
932                         break;
933                 case OUT_REF_ADD:
934                         rc = update_recovery_ref_add(env, sub_dt_obj,
935                                                      op, params, ta,
936                                                      st->st_sub_th);
937                         break;
938                 case OUT_REF_DEL:
939                         rc = update_recovery_ref_del(env, sub_dt_obj,
940                                                      op, params, ta,
941                                                      st->st_sub_th);
942                         break;
943                 case OUT_ATTR_SET:
944                         rc = update_recovery_attr_set(env, sub_dt_obj,
945                                                       op, params, ta,
946                                                       st->st_sub_th);
947                         break;
948                 case OUT_XATTR_SET:
949                         rc = update_recovery_xattr_set(env, sub_dt_obj,
950                                                        op, params, ta,
951                                                        st->st_sub_th);
952                         break;
953                 case OUT_INDEX_INSERT:
954                         rc = update_recovery_index_insert(env, sub_dt_obj,
955                                                           op, params, ta,
956                                                           st->st_sub_th);
957                         break;
958                 case OUT_INDEX_DELETE:
959                         rc = update_recovery_index_delete(env, sub_dt_obj,
960                                                           op, params, ta,
961                                                           st->st_sub_th);
962                         break;
963                 case OUT_WRITE:
964                         rc = update_recovery_write(env, sub_dt_obj,
965                                                    op, params, ta,
966                                                    st->st_sub_th);
967                         break;
968                 case OUT_XATTR_DEL:
969                         rc = update_recovery_xattr_del(env, sub_dt_obj,
970                                                        op, params, ta,
971                                                        st->st_sub_th);
972                         break;
973                 default:
974                         CERROR("Unknown update type %u\n", (__u32)op->uop_type);
975                         rc = -EINVAL;
976                         break;
977                 }
978 next:
979                 lu_object_put(env, &dt_obj->do_lu);
980                 if (rc < 0)
981                         break;
982         }
983
984         ta->ta_handle->th_result = rc;
985         RETURN(rc);
986 }
987
988 /**
989  * redo updates on MDT if needed.
990  *
991  * During DNE recovery, the recovery thread (target_recovery_thread) will call
992  * this function to replay distribute txn updates on all MDTs. It only replay
993  * updates on the MDT where the update record is missing.
994  *
995  * If the update already exists on the MDT, then it does not need replay the
996  * updates on that MDT, and only mark the sub transaction has been committed
997  * there.
998  *
999  * \param[in] env       execution environment
1000  * \param[in] tdtd      target distribute txn data, which holds the replay list
1001  *                      and all parameters needed by replay process.
1002  * \param[in] dtrq      distribute txn replay req.
1003  *
1004  * \retval              0 if replay succeeds.
1005  * \retval              negative errno if replay failes.
1006  */
1007 int distribute_txn_replay_handle(struct lu_env *env,
1008                                  struct target_distribute_txn_data *tdtd,
1009                                  struct distribute_txn_replay_req *dtrq)
1010 {
1011         struct update_records   *records = &dtrq->dtrq_lur->lur_update_rec;
1012         struct thandle_exec_args *ta;
1013         struct lu_context       session_env;
1014         struct thandle          *th = NULL;
1015         struct top_thandle      *top_th;
1016         struct top_multiple_thandle *tmt;
1017         struct thandle_update_records *tur = NULL;
1018         unsigned int            i;
1019         int                     rc = 0;
1020         ENTRY;
1021
1022         /* initialize session, it is needed for the handler of target */
1023         rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF);
1024         if (rc) {
1025                 CERROR("%s: failure to initialize session: rc = %d\n",
1026                        tdtd->tdtd_lut->lut_obd->obd_name, rc);
1027                 RETURN(rc);
1028         }
1029         lu_context_enter(&session_env);
1030         env->le_ses = &session_env;
1031         lu_env_refill(env);
1032         update_records_dump(records, D_HA, true);
1033         th = top_trans_create(env, NULL);
1034         if (IS_ERR(th))
1035                 GOTO(exit_session, rc = PTR_ERR(th));
1036
1037         ta = &update_env_info(env)->uti_tea;
1038         ta->ta_argno = 0;
1039
1040         update_env_info(env)->uti_dtrq = dtrq;
1041         /* Create distribute transaction structure for this top thandle */
1042         top_th = container_of(th, struct top_thandle, tt_super);
1043         rc = top_trans_create_tmt(env, top_th);
1044         if (rc < 0)
1045                 GOTO(stop_trans, rc);
1046
1047         ta->ta_handle = th;
1048
1049         /* check if the distribute transaction has been committed */
1050         tmt = top_th->tt_multiple_thandle;
1051         tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
1052         tmt->tmt_batchid = records->ur_batchid;
1053         tgt_th_info(env)->tti_transno = records->ur_master_transno;
1054
1055         if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
1056                 tmt->tmt_committed = 1;
1057
1058         rc = update_recovery_exec(env, tdtd, dtrq, ta);
1059         if (rc < 0)
1060                 GOTO(stop_trans, rc);
1061
1062         /* If no updates are needed to be replayed, then
1063          * mark this records as committed, so commit thread
1064          * distribute_txn_commit_thread() will delete the
1065          * record */
1066         if (ta->ta_argno == 0)
1067                 tmt->tmt_committed = 1;
1068
1069         tur = &update_env_info(env)->uti_tur;
1070         tur->tur_update_records = dtrq->dtrq_lur;
1071         tur->tur_update_records_buf_size = dtrq->dtrq_lur_size;
1072         tur->tur_update_params = NULL;
1073         tur->tur_update_param_count = 0;
1074         tmt->tmt_update_records = tur;
1075
1076         distribute_txn_insert_by_batchid(tmt);
1077         rc = top_trans_start(env, NULL, th);
1078         if (rc < 0)
1079                 GOTO(stop_trans, rc);
1080
1081         for (i = 0; i < ta->ta_argno; i++) {
1082                 struct tx_arg           *ta_arg;
1083                 struct dt_object        *dt_obj;
1084                 struct dt_device        *sub_dt;
1085                 struct sub_thandle      *st;
1086
1087                 ta_arg = ta->ta_args[i];
1088                 dt_obj = ta_arg->object;
1089
1090                 LASSERT(tmt->tmt_committed == 0);
1091                 sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
1092                 st = lookup_sub_thandle(tmt, sub_dt);
1093                 LASSERT(st != NULL);
1094                 LASSERT(st->st_sub_th != NULL);
1095                 rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
1096                                              ta->ta_args[i]);
1097                 if (unlikely(rc < 0)) {
1098                         CDEBUG(D_HA, "error during execution of #%u from"
1099                                " %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
1100                                ta->ta_args[i]->line, rc);
1101                         while (--i >= 0) {
1102                                 if (ta->ta_args[i]->undo_fn != NULL) {
1103                                         dt_obj = ta->ta_args[i]->object;
1104                                         sub_dt =
1105                                                 lu2dt_dev(dt_obj->do_lu.lo_dev);
1106                                         st = lookup_sub_thandle(tmt, sub_dt);
1107                                         LASSERT(st != NULL);
1108                                         LASSERT(st->st_sub_th != NULL);
1109
1110                                         ta->ta_args[i]->undo_fn(env,
1111                                                                st->st_sub_th,
1112                                                                ta->ta_args[i]);
1113                                 } else {
1114                                         CERROR("%s: undo for %s:%d: rc = %d\n",
1115                                              dt_obd_name(ta->ta_handle->th_dev),
1116                                                ta->ta_args[i]->file,
1117                                                ta->ta_args[i]->line, -ENOTSUPP);
1118                                 }
1119                         }
1120                         break;
1121                 }
1122                 CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n",
1123                        dt_obd_name(sub_dt), i, ta->ta_argno, rc);
1124         }
1125
1126 stop_trans:
1127         if (rc < 0)
1128                 th->th_result = rc;
1129         rc = top_trans_stop(env, tdtd->tdtd_dt, th);
1130         for (i = 0; i < ta->ta_argno; i++) {
1131                 if (ta->ta_args[i]->object != NULL) {
1132                         lu_object_put(env, &ta->ta_args[i]->object->do_lu);
1133                         ta->ta_args[i]->object = NULL;
1134                 }
1135         }
1136
1137         if (tur != NULL)
1138                 tur->tur_update_records = NULL;
1139 exit_session:
1140         lu_context_exit(&session_env);
1141         lu_context_fini(&session_env);
1142         RETURN(rc);
1143 }
1144 EXPORT_SYMBOL(distribute_txn_replay_handle);