Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lu_object.h>
40 #include <dt_object.h>
41 #include <lustre_fid.h>
42 #include <lustre_lib.h>
43 #include <lustre_net.h>
44 #include <md_object.h>
45 #include <obd_class.h>
46
47 #include "lfsck_internal.h"
48
49 #define LFSCK_LAYOUT_MAGIC_V1           0xB173AE14
50 #define LFSCK_LAYOUT_MAGIC_V2           0xB1734D76
51 #define LFSCK_LAYOUT_MAGIC_V3           0xB17371B9
52 #define LFSCK_LAYOUT_MAGIC_V4           0xB1732FED
53
54 #define LFSCK_LAYOUT_MAGIC              LFSCK_LAYOUT_MAGIC_V4
55
56 struct lfsck_layout_seq {
57         struct list_head         lls_list;
58         __u64                    lls_seq;
59         __u64                    lls_lastid;
60         __u64                    lls_lastid_known;
61         struct dt_object        *lls_lastid_obj;
62         unsigned int             lls_dirty:1;
63 };
64
65 struct lfsck_layout_slave_target {
66         /* link into lfsck_layout_slave_data::llsd_master_list. */
67         struct list_head        llst_list;
68         /* The position for next record in the rbtree for iteration. */
69         struct lu_fid           llst_fid;
70         /* Dummy hash for iteration against the rbtree. */
71         __u64                   llst_hash;
72         __u64                   llst_gen;
73         atomic_t                llst_ref;
74         __u32                   llst_index;
75         /* How many times we have failed to get the master status. */
76         int                     llst_failures;
77 };
78
79 struct lfsck_layout_slave_data {
80         /* list for lfsck_layout_seq */
81         struct list_head         llsd_seq_list;
82
83         /* list for the masters involve layout verification. */
84         struct list_head         llsd_master_list;
85         spinlock_t               llsd_lock;
86         __u64                    llsd_touch_gen;
87         struct dt_object        *llsd_rb_obj;
88         struct rb_root           llsd_rb_root;
89         struct rw_semaphore      llsd_rb_rwsem;
90         unsigned int             llsd_rbtree_valid:1;
91 };
92
93 struct lfsck_layout_slave_async_args {
94         struct obd_export                *llsaa_exp;
95         struct lfsck_component           *llsaa_com;
96         struct lfsck_layout_slave_target *llsaa_llst;
97 };
98
99 static inline bool lfsck_comp_extent_aligned(__u64 size)
100 {
101          return (size & (LOV_MIN_STRIPE_SIZE - 1)) == 0;
102 }
103
104 static inline void
105 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
106 {
107         if (atomic_dec_and_test(&llst->llst_ref)) {
108                 LASSERT(list_empty(&llst->llst_list));
109
110                 OBD_FREE_PTR(llst);
111         }
112 }
113
114 static inline int
115 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
116 {
117         struct lfsck_layout_slave_target *llst;
118         struct lfsck_layout_slave_target *tmp;
119         int                               rc   = 0;
120
121         OBD_ALLOC_PTR(llst);
122         if (llst == NULL)
123                 return -ENOMEM;
124
125         INIT_LIST_HEAD(&llst->llst_list);
126         llst->llst_gen = 0;
127         llst->llst_index = index;
128         atomic_set(&llst->llst_ref, 1);
129
130         spin_lock(&llsd->llsd_lock);
131         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
132                 if (tmp->llst_index == index) {
133                         rc = -EALREADY;
134                         break;
135                 }
136         }
137         if (rc == 0)
138                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
139         spin_unlock(&llsd->llsd_lock);
140
141         if (rc != 0)
142                 OBD_FREE_PTR(llst);
143
144         return rc;
145 }
146
147 static inline void
148 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
149                       struct lfsck_layout_slave_target *llst)
150 {
151         bool del = false;
152
153         spin_lock(&llsd->llsd_lock);
154         if (!list_empty(&llst->llst_list)) {
155                 list_del_init(&llst->llst_list);
156                 del = true;
157         }
158         spin_unlock(&llsd->llsd_lock);
159
160         if (del)
161                 lfsck_layout_llst_put(llst);
162 }
163
164 static inline struct lfsck_layout_slave_target *
165 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
166                                __u32 index, bool unlink)
167 {
168         struct lfsck_layout_slave_target *llst;
169
170         spin_lock(&llsd->llsd_lock);
171         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
172                 if (llst->llst_index == index) {
173                         if (unlink)
174                                 list_del_init(&llst->llst_list);
175                         else
176                                 atomic_inc(&llst->llst_ref);
177                         spin_unlock(&llsd->llsd_lock);
178
179                         return llst;
180                 }
181         }
182         spin_unlock(&llsd->llsd_lock);
183
184         return NULL;
185 }
186
187 static struct lfsck_layout_req *
188 lfsck_layout_assistant_req_init(struct lfsck_assistant_object *lso,
189                                 struct dt_object *child, __u32 comp_id,
190                                 __u32 ost_idx, __u32 lov_idx)
191 {
192         struct lfsck_layout_req *llr;
193
194         OBD_ALLOC_PTR(llr);
195         if (llr == NULL)
196                 return ERR_PTR(-ENOMEM);
197
198         INIT_LIST_HEAD(&llr->llr_lar.lar_list);
199         llr->llr_lar.lar_parent = lfsck_assistant_object_get(lso);
200         llr->llr_child = child;
201         llr->llr_comp_id = comp_id;
202         llr->llr_ost_idx = ost_idx;
203         llr->llr_lov_idx = lov_idx;
204
205         return llr;
206 }
207
208 static void lfsck_layout_assistant_req_fini(const struct lu_env *env,
209                                             struct lfsck_assistant_req *lar)
210 {
211         struct lfsck_layout_req *llr =
212                         container_of0(lar, struct lfsck_layout_req, llr_lar);
213
214         lfsck_object_put(env, llr->llr_child);
215         lfsck_assistant_object_put(env, lar->lar_parent);
216         OBD_FREE_PTR(llr);
217 }
218
219 static int
220 lfsck_layout_assistant_sync_failures_interpret(const struct lu_env *env,
221                                                struct ptlrpc_request *req,
222                                                void *args, int rc)
223 {
224         if (rc == 0) {
225                 struct lfsck_async_interpret_args *laia = args;
226                 struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
227
228                 ltd->ltd_synced_failures = 1;
229                 atomic_dec(laia->laia_count);
230         }
231
232         return 0;
233 }
234
235 /**
236  * Notify remote LFSCK instances about former failures.
237  *
238  * The local LFSCK instance has recorded which OSTs have ever failed to respond
239  * some LFSCK verification requests (maybe because of network issues or the OST
240  * itself trouble). During the respond gap, the OST may missed some OST-objects
241  * verification, then the OST cannot know whether related OST-objects have been
242  * referenced by related MDT-objects or not, then in the second-stage scanning,
243  * these OST-objects will be regarded as orphan, if the OST-object contains bad
244  * parent FID for back reference, then it will misguide the LFSCK to make wrong
245  * fixing for the fake orphan.
246  *
247  * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
248  * it will scan the bitmap for the ever failed OSTs, and notify them that they
249  * have ever missed some OST-object verification and should skip the handling
250  * for orphan OST-objects on all MDTs that are in the layout LFSCK.
251  *
252  * \param[in] env       pointer to the thread context
253  * \param[in] com       pointer to the lfsck component
254  * \param[in] lr        pointer to the lfsck request
255  */
256 static void lfsck_layout_assistant_sync_failures(const struct lu_env *env,
257                                                  struct lfsck_component *com,
258                                                  struct lfsck_request *lr)
259 {
260         struct lfsck_async_interpret_args *laia  =
261                                 &lfsck_env_info(env)->lti_laia2;
262         struct lfsck_assistant_data       *lad   = com->lc_data;
263         struct lfsck_layout               *lo    = com->lc_file_ram;
264         struct lfsck_instance             *lfsck = com->lc_lfsck;
265         struct lfsck_tgt_descs            *ltds  = &lfsck->li_ost_descs;
266         struct lfsck_tgt_desc             *ltd;
267         struct ptlrpc_request_set         *set;
268         atomic_t                           count;
269         __u32                              idx;
270         int                                rc    = 0;
271         ENTRY;
272
273         if (!test_bit(LAD_INCOMPLETE, &lad->lad_flags))
274                 RETURN_EXIT;
275
276         /* If the MDT has ever failed to verfiy some OST-objects,
277          * then sync failures with them firstly. */
278         lr->lr_flags2 = lo->ll_flags | LF_INCOMPLETE;
279
280         atomic_set(&count, 0);
281         memset(laia, 0, sizeof(*laia));
282         laia->laia_count = &count;
283         set = ptlrpc_prep_set();
284         if (set == NULL)
285                 GOTO(out, rc = -ENOMEM);
286
287         down_read(&ltds->ltd_rw_sem);
288         cfs_foreach_bit(lad->lad_bitmap, idx) {
289                 ltd = lfsck_ltd2tgt(ltds, idx);
290                 if (unlikely(!ltd))
291                         continue;
292
293                 laia->laia_ltd = ltd;
294                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
295                                 lfsck_layout_assistant_sync_failures_interpret,
296                                 laia, LFSCK_NOTIFY);
297                 if (rc != 0) {
298                         CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
299                                "notify target %x for %s phase1 done: "
300                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
301                                ltd->ltd_index, lad->lad_name, rc);
302
303                         break;
304                 }
305
306                 atomic_inc(&count);
307         }
308         up_read(&ltds->ltd_rw_sem);
309
310         if (rc == 0 && atomic_read(&count) > 0)
311                 rc = ptlrpc_set_wait(env, set);
312
313         ptlrpc_set_destroy(set);
314
315         if (rc == 0 && atomic_read(&count) > 0)
316                 rc = -EINVAL;
317
318         GOTO(out, rc);
319
320 out:
321         if (rc != 0)
322                 /* If failed to sync failures with the OSTs, then have to
323                  * mark the whole LFSCK as LF_INCOMPLETE to skip the whole
324                  * subsequent orphan OST-object handling. */
325                 lo->ll_flags |= LF_INCOMPLETE;
326
327         lr->lr_flags2 = lo->ll_flags;
328 }
329
330 static int lfsck_layout_verify_header_v1v3(struct dt_object *obj,
331                                            struct lov_mds_md_v1 *lmm,
332                                            __u64 start, __u32 comp_id)
333 {
334         __u32 magic;
335         __u32 pattern;
336
337         magic = le32_to_cpu(lmm->lmm_magic);
338         /* If magic crashed, keep it there. Sometime later, during OST-object
339          * orphan handling, if some OST-object(s) back-point to it, it can be
340          * verified and repaired. */
341         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
342                 int rc;
343
344                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
345                         rc = -EOPNOTSUPP;
346                 else
347                         rc = -EINVAL;
348
349                 CDEBUG(D_LFSCK, "%s LOV EA magic %u for the file "DFID"\n",
350                        rc == -EINVAL ? "Unknown" : "Unsupported",
351                        magic, PFID(lfsck_dto2fid(obj)));
352
353                 return rc;
354         }
355
356         pattern = le32_to_cpu(lmm->lmm_pattern);
357
358 #if 0
359         /* XXX: DoM file verification will be supportted via LU-11081. */
360         if (lov_pattern(pattern) == LOV_PATTERN_MDT) {
361                 if (start != 0) {
362                         CDEBUG(D_LFSCK, "The DoM entry for "DFID" is not "
363                                "the first component in the mirror %x/%llu\n",
364                                PFID(lfsck_dto2fid(obj)), comp_id, start);
365
366                         return -EINVAL;
367                 }
368         }
369 #endif
370
371         if (!lov_pattern_supported_normal_comp(lov_pattern(pattern))) {
372                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u for the file "
373                        DFID" in the component %x\n",
374                        pattern, PFID(lfsck_dto2fid(obj)), comp_id);
375
376                 return -EOPNOTSUPP;
377         }
378
379         return 0;
380 }
381
382 static int lfsck_layout_verify_header_foreign(struct dt_object *obj,
383                                               struct lov_foreign_md *lfm,
384                                               size_t len)
385 {
386         /* magic has been verified already */
387         __u32 value_len = le32_to_cpu(lfm->lfm_length);
388         /* type and flags are not checked for instance */
389
390         CDEBUG(D_INFO, "foreign LOV EA, magic %x, len %u, type %x, flags %x, for file "DFID"\n",
391                le32_to_cpu(lfm->lfm_magic), value_len,
392                le32_to_cpu(lfm->lfm_type), le32_to_cpu(lfm->lfm_flags),
393                PFID(lfsck_dto2fid(obj)));
394
395         if (len != value_len + offsetof(typeof(*lfm), lfm_value))
396                 CDEBUG(D_LFSCK, "foreign LOV EA internal size %u does not match EA full size %zu for file "DFID"\n",
397                        value_len, len, PFID(lfsck_dto2fid(obj)));
398
399         /* nothing to repair */
400         return -ENODATA;
401 }
402
403 static int lfsck_layout_verify_header(struct dt_object *obj,
404                                       struct lov_mds_md_v1 *lmm, size_t len)
405 {
406         int rc = 0;
407
408         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) {
409                 struct lov_comp_md_v1 *lcm = (struct lov_comp_md_v1 *)lmm;
410                 int i;
411                 __u16 count = le16_to_cpu(lcm->lcm_entry_count);
412
413                 if (unlikely(count == 0)) {
414                         CDEBUG(D_LFSCK, "the PFL file "DFID" contains invalid "
415                                "components count 0\n",
416                                PFID(lfsck_dto2fid(obj)));
417
418                         return -EINVAL;
419                 }
420
421                 for (i = 0; i < count && !rc; i++) {
422                         struct lov_comp_md_entry_v1 *lcme =
423                                                 &lcm->lcm_entries[i];
424                         __u64 start = le64_to_cpu(lcme->lcme_extent.e_start);
425                         __u64 end = le64_to_cpu(lcme->lcme_extent.e_end);
426                         __u32 comp_id = le32_to_cpu(lcme->lcme_id);
427
428                         if (unlikely(comp_id == LCME_ID_INVAL ||
429                                      comp_id > LCME_ID_MAX)) {
430                                 CDEBUG(D_LFSCK, "found invalid FPL ID %u "
431                                        "for the file "DFID" at idx %d\n",
432                                        comp_id, PFID(lfsck_dto2fid(obj)), i);
433
434                                 return -EINVAL;
435                         }
436
437                         if (unlikely(start >= end ||
438                                      !lfsck_comp_extent_aligned(start) ||
439                                      (!lfsck_comp_extent_aligned(end) &&
440                                       end != LUSTRE_EOF))) {
441                                 CDEBUG(D_LFSCK, "found invalid FPL extent "
442                                        "range [%llu - %llu) for the file "
443                                        DFID" at idx %d\n",
444                                        start, end, PFID(lfsck_dto2fid(obj)), i);
445
446                                 return -EINVAL;
447                         }
448
449                         rc = lfsck_layout_verify_header_v1v3(obj,
450                                         (struct lov_mds_md_v1 *)((char *)lmm +
451                                         le32_to_cpu(lcme->lcme_offset)), start,
452                                         comp_id);
453                 }
454         } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_FOREIGN) {
455                 rc = lfsck_layout_verify_header_foreign(obj,
456                                                 (struct lov_foreign_md *)lmm,
457                                                 len);
458         } else {
459                 rc = lfsck_layout_verify_header_v1v3(obj, lmm, 1, 0);
460         }
461
462         return rc;
463 }
464
465 static int lfsck_layout_get_lovea(const struct lu_env *env,
466                                   struct dt_object *obj, struct lu_buf *buf)
467 {
468         int rc;
469         int rc1;
470
471 again:
472         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV);
473         if (rc == -ERANGE) {
474                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV);
475                 if (rc <= 0)
476                         return !rc ? -ENODATA : rc;
477
478                 lu_buf_realloc(buf, rc);
479                 if (buf->lb_buf == NULL)
480                         return -ENOMEM;
481
482                 goto again;
483         }
484
485         if (rc <= 0)
486                 return !rc ? -ENODATA : rc;
487
488         if (unlikely(buf->lb_buf == NULL)) {
489                 lu_buf_alloc(buf, rc);
490                 if (buf->lb_buf == NULL)
491                         return -ENOMEM;
492
493                 goto again;
494         }
495
496         rc1 = lfsck_layout_verify_header(obj, buf->lb_buf, rc);
497
498         return rc1 ? rc1 : rc;
499 }
500
501 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_SIZE
502 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
503 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
504
505 struct lfsck_rbtree_node {
506         struct rb_node   lrn_node;
507         __u64            lrn_seq;
508         __u32            lrn_first_oid;
509         atomic_t         lrn_known_count;
510         atomic_t         lrn_accessed_count;
511         void            *lrn_known_bitmap;
512         void            *lrn_accessed_bitmap;
513 };
514
515 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
516                                    __u64 seq, __u32 oid)
517 {
518         if (seq < lrn->lrn_seq)
519                 return -1;
520
521         if (seq > lrn->lrn_seq)
522                 return 1;
523
524         if (oid < lrn->lrn_first_oid)
525                 return -1;
526
527         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
528                 return 1;
529
530         return 0;
531 }
532
533 /* The caller should hold llsd->llsd_rb_lock. */
534 static struct lfsck_rbtree_node *
535 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
536                     const struct lu_fid *fid, bool *exact)
537 {
538         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
539         struct rb_node           *prev  = NULL;
540         struct lfsck_rbtree_node *lrn   = NULL;
541         int                       rc    = 0;
542
543         if (exact != NULL)
544                 *exact = true;
545
546         while (node != NULL) {
547                 prev = node;
548                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
549                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
550                 if (rc < 0)
551                         node = node->rb_left;
552                 else if (rc > 0)
553                         node = node->rb_right;
554                 else
555                         return lrn;
556         }
557
558         if (exact == NULL)
559                 return NULL;
560
561         /* If there is no exactly matched one, then to the next valid one. */
562         *exact = false;
563
564         /* The rbtree is empty. */
565         if (rc == 0)
566                 return NULL;
567
568         if (rc < 0)
569                 return lrn;
570
571         node = rb_next(prev);
572
573         /* The end of the rbtree. */
574         if (node == NULL)
575                 return NULL;
576
577         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
578
579         return lrn;
580 }
581
582 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
583                                                   const struct lu_fid *fid)
584 {
585         struct lfsck_rbtree_node *lrn;
586
587         OBD_ALLOC_PTR(lrn);
588         if (lrn == NULL)
589                 return ERR_PTR(-ENOMEM);
590
591         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
592         if (lrn->lrn_known_bitmap == NULL) {
593                 OBD_FREE_PTR(lrn);
594
595                 return ERR_PTR(-ENOMEM);
596         }
597
598         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
599         if (lrn->lrn_accessed_bitmap == NULL) {
600                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
601                 OBD_FREE_PTR(lrn);
602
603                 return ERR_PTR(-ENOMEM);
604         }
605
606         RB_CLEAR_NODE(&lrn->lrn_node);
607         lrn->lrn_seq = fid_seq(fid);
608         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
609         atomic_set(&lrn->lrn_known_count, 0);
610         atomic_set(&lrn->lrn_accessed_count, 0);
611
612         return lrn;
613 }
614
615 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
616 {
617         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
618         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
619         OBD_FREE_PTR(lrn);
620 }
621
622 /* The caller should hold lock. */
623 static struct lfsck_rbtree_node *
624 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
625                     struct lfsck_rbtree_node *lrn)
626 {
627         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
628         struct rb_node            *parent = NULL;
629         struct lfsck_rbtree_node  *tmp;
630         int                        rc;
631
632         while (*pos != NULL) {
633                 parent = *pos;
634                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
635                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
636                 if (rc < 0)
637                         pos = &(*pos)->rb_left;
638                 else if (rc > 0)
639                         pos = &(*pos)->rb_right;
640                 else
641                         return tmp;
642         }
643
644         rb_link_node(&lrn->lrn_node, parent, pos);
645         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
646
647         return lrn;
648 }
649
650 extern const struct dt_index_operations lfsck_orphan_index_ops;
651
652 static int lfsck_rbtree_setup(const struct lu_env *env,
653                               struct lfsck_component *com)
654 {
655         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
656         struct lfsck_instance           *lfsck  = com->lc_lfsck;
657         struct dt_device                *dev    = lfsck->li_bottom;
658         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
659         struct dt_object                *obj;
660
661         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
662         fid->f_oid = lfsck_dev_idx(lfsck);
663         fid->f_ver = 0;
664         obj = dt_locate(env, dev, fid);
665         if (IS_ERR(obj))
666                 RETURN(PTR_ERR(obj));
667
668         /* Generate an in-RAM object to stand for the layout rbtree.
669          * Scanning the layout rbtree will be via the iteration over
670          * the object. In the future, the rbtree may be written onto
671          * disk with the object.
672          *
673          * Mark the object to be as exist. */
674         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
675         obj->do_index_ops = &lfsck_orphan_index_ops;
676         llsd->llsd_rb_obj = obj;
677         llsd->llsd_rbtree_valid = 1;
678         dev->dd_record_fid_accessed = 1;
679
680         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
681                lfsck_lfsck2name(lfsck));
682
683         return 0;
684 }
685
686 static void lfsck_rbtree_cleanup(const struct lu_env *env,
687                                  struct lfsck_component *com)
688 {
689         struct lfsck_instance           *lfsck = com->lc_lfsck;
690         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
691         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
692         struct rb_node                  *next;
693         struct lfsck_rbtree_node        *lrn;
694
695         lfsck->li_bottom->dd_record_fid_accessed = 0;
696         /* Invalid the rbtree, then no others will use it. */
697         down_write(&llsd->llsd_rb_rwsem);
698         llsd->llsd_rbtree_valid = 0;
699         up_write(&llsd->llsd_rb_rwsem);
700
701         while (node != NULL) {
702                 next = rb_next(node);
703                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
704                 rb_erase(node, &llsd->llsd_rb_root);
705                 lfsck_rbtree_free(lrn);
706                 node = next;
707         }
708
709         if (llsd->llsd_rb_obj != NULL) {
710                 lfsck_object_put(env, llsd->llsd_rb_obj);
711                 llsd->llsd_rb_obj = NULL;
712         }
713
714         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
715                lfsck_lfsck2name(lfsck));
716 }
717
718 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
719                                        struct lfsck_component *com,
720                                        const struct lu_fid *fid,
721                                        bool accessed)
722 {
723         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
724         struct lfsck_rbtree_node        *lrn;
725         bool                             insert = false;
726         int                              idx;
727         int                              rc     = 0;
728         ENTRY;
729
730         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
731                 RETURN_EXIT;
732
733         if (!fid_is_idif(fid) && !fid_is_norm(fid))
734                 RETURN_EXIT;
735
736         down_read(&llsd->llsd_rb_rwsem);
737         if (!llsd->llsd_rbtree_valid)
738                 GOTO(unlock, rc = 0);
739
740         lrn = lfsck_rbtree_search(llsd, fid, NULL);
741         if (lrn == NULL) {
742                 struct lfsck_rbtree_node *tmp;
743
744                 LASSERT(!insert);
745
746                 up_read(&llsd->llsd_rb_rwsem);
747                 tmp = lfsck_rbtree_new(env, fid);
748                 if (IS_ERR(tmp))
749                         GOTO(out, rc = PTR_ERR(tmp));
750
751                 insert = true;
752                 down_write(&llsd->llsd_rb_rwsem);
753                 if (!llsd->llsd_rbtree_valid) {
754                         lfsck_rbtree_free(tmp);
755                         GOTO(unlock, rc = 0);
756                 }
757
758                 lrn = lfsck_rbtree_insert(llsd, tmp);
759                 if (lrn != tmp)
760                         lfsck_rbtree_free(tmp);
761         }
762
763         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
764         /* Any accessed object must be a known object. */
765         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
766                 atomic_inc(&lrn->lrn_known_count);
767         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
768                 atomic_inc(&lrn->lrn_accessed_count);
769
770         GOTO(unlock, rc = 0);
771
772 unlock:
773         if (insert)
774                 up_write(&llsd->llsd_rb_rwsem);
775         else
776                 up_read(&llsd->llsd_rb_rwsem);
777 out:
778         if (rc != 0 && accessed) {
779                 struct lfsck_layout *lo = com->lc_file_ram;
780
781                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
782                        "bitmap, and will cause incorrect LFSCK OST-object "
783                        "handling, so disable it to cancel orphan handling "
784                        "for related device. rc = %d\n",
785                        lfsck_lfsck2name(com->lc_lfsck), rc);
786
787                 lo->ll_flags |= LF_INCOMPLETE;
788                 lfsck_rbtree_cleanup(env, com);
789         }
790 }
791
792 static inline void lldk_le_to_cpu(struct lfsck_layout_dangling_key *des,
793                                   const struct lfsck_layout_dangling_key *src)
794 {
795         fid_le_to_cpu(&des->lldk_fid, &src->lldk_fid);
796         des->lldk_comp_id = le32_to_cpu(src->lldk_comp_id);
797         des->lldk_ea_off = le32_to_cpu(src->lldk_ea_off);
798 }
799
800 static inline void lldk_cpu_to_le(struct lfsck_layout_dangling_key *des,
801                                   const struct lfsck_layout_dangling_key *src)
802 {
803         fid_cpu_to_le(&des->lldk_fid, &src->lldk_fid);
804         des->lldk_comp_id = cpu_to_le32(src->lldk_comp_id);
805         des->lldk_ea_off = cpu_to_le32(src->lldk_ea_off);
806 }
807
808 static inline void lldk_be_to_cpu(struct lfsck_layout_dangling_key *des,
809                                   const struct lfsck_layout_dangling_key *src)
810 {
811         fid_be_to_cpu(&des->lldk_fid, &src->lldk_fid);
812         des->lldk_comp_id = be32_to_cpu(src->lldk_comp_id);
813         des->lldk_ea_off = be32_to_cpu(src->lldk_ea_off);
814 }
815
816 static inline void lldk_cpu_to_be(struct lfsck_layout_dangling_key *des,
817                                   const struct lfsck_layout_dangling_key *src)
818 {
819         fid_cpu_to_be(&des->lldk_fid, &src->lldk_fid);
820         des->lldk_comp_id = cpu_to_be32(src->lldk_comp_id);
821         des->lldk_ea_off = cpu_to_be32(src->lldk_ea_off);
822 }
823
824 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
825                                    const struct lfsck_layout *src)
826 {
827         int i;
828
829         des->ll_magic = le32_to_cpu(src->ll_magic);
830         des->ll_status = le32_to_cpu(src->ll_status);
831         des->ll_flags = le32_to_cpu(src->ll_flags);
832         des->ll_success_count = le32_to_cpu(src->ll_success_count);
833         des->ll_run_time_phase1 = le64_to_cpu(src->ll_run_time_phase1);
834         des->ll_run_time_phase2 = le64_to_cpu(src->ll_run_time_phase2);
835         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
836         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
837         des->ll_time_last_checkpoint =
838                                 le64_to_cpu(src->ll_time_last_checkpoint);
839         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
840         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
841         des->ll_pos_first_inconsistent =
842                         le64_to_cpu(src->ll_pos_first_inconsistent);
843         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
844         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
845         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
846         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
847         for (i = 0; i < LLIT_MAX; i++)
848                 des->ll_objs_repaired[i] =
849                                 le64_to_cpu(src->ll_objs_repaired[i]);
850         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
851         des->ll_bitmap_size = le32_to_cpu(src->ll_bitmap_size);
852         lldk_le_to_cpu(&des->ll_lldk_latest_scanned_phase2,
853                        &src->ll_lldk_latest_scanned_phase2);
854 }
855
856 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
857                                    const struct lfsck_layout *src)
858 {
859         int i;
860
861         des->ll_magic = cpu_to_le32(src->ll_magic);
862         des->ll_status = cpu_to_le32(src->ll_status);
863         des->ll_flags = cpu_to_le32(src->ll_flags);
864         des->ll_success_count = cpu_to_le32(src->ll_success_count);
865         des->ll_run_time_phase1 = cpu_to_le64(src->ll_run_time_phase1);
866         des->ll_run_time_phase2 = cpu_to_le64(src->ll_run_time_phase2);
867         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
868         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
869         des->ll_time_last_checkpoint =
870                                 cpu_to_le64(src->ll_time_last_checkpoint);
871         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
872         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
873         des->ll_pos_first_inconsistent =
874                         cpu_to_le64(src->ll_pos_first_inconsistent);
875         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
876         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
877         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
878         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
879         for (i = 0; i < LLIT_MAX; i++)
880                 des->ll_objs_repaired[i] =
881                                 cpu_to_le64(src->ll_objs_repaired[i]);
882         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
883         des->ll_bitmap_size = cpu_to_le32(src->ll_bitmap_size);
884         lldk_cpu_to_le(&des->ll_lldk_latest_scanned_phase2,
885                        &src->ll_lldk_latest_scanned_phase2);
886 }
887
888 /**
889  * Load the OST bitmap from the lfsck_layout trace file.
890  *
891  * \param[in] env       pointer to the thread context
892  * \param[in] com       pointer to the lfsck component
893  *
894  * \retval              0 for success
895  * \retval              negative error number on failure or data corruption
896  */
897 static int lfsck_layout_load_bitmap(const struct lu_env *env,
898                                     struct lfsck_component *com)
899 {
900         struct dt_object                *obj    = com->lc_obj;
901         struct lfsck_assistant_data     *lad    = com->lc_data;
902         struct lfsck_layout             *lo     = com->lc_file_ram;
903         struct cfs_bitmap                       *bitmap = lad->lad_bitmap;
904         loff_t                           pos    = com->lc_file_size;
905         ssize_t                          size;
906         __u32                            nbits;
907         int                              rc;
908         ENTRY;
909
910         if (com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size >
911             lo->ll_bitmap_size)
912                 nbits = com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size;
913         else
914                 nbits = lo->ll_bitmap_size;
915
916         if (unlikely(nbits < BITS_PER_LONG))
917                 nbits = BITS_PER_LONG;
918
919         if (nbits > bitmap->size) {
920                 __u32 new_bits = bitmap->size;
921                 struct cfs_bitmap *new_bitmap;
922
923                 while (new_bits < nbits)
924                         new_bits <<= 1;
925
926                 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
927                 if (new_bitmap == NULL)
928                         RETURN(-ENOMEM);
929
930                 lad->lad_bitmap = new_bitmap;
931                 CFS_FREE_BITMAP(bitmap);
932                 bitmap = new_bitmap;
933         }
934
935         if (lo->ll_bitmap_size == 0) {
936                 clear_bit(LAD_INCOMPLETE, &lad->lad_flags);
937                 CFS_RESET_BITMAP(bitmap);
938
939                 RETURN(0);
940         }
941
942         size = (lo->ll_bitmap_size + 7) >> 3;
943         rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos);
944         if (rc != size)
945                 RETURN(rc >= 0 ? -EINVAL : rc);
946
947         if (cfs_bitmap_check_empty(bitmap))
948                 clear_bit(LAD_INCOMPLETE, &lad->lad_flags);
949         else
950                 set_bit(LAD_INCOMPLETE, &lad->lad_flags);
951
952         RETURN(0);
953 }
954
955 /**
956  * Load the layout LFSCK trace file from disk.
957  *
958  * The layout LFSCK trace file records the layout LFSCK status information
959  * and other statistics, such as how many objects have been scanned, and how
960  * many objects have been repaired, and etc. It also contains the bitmap for
961  * failed OSTs during the layout LFSCK. All these information will be loaded
962  * from disk to RAM when the layout LFSCK component setup.
963  *
964  * \param[in] env       pointer to the thread context
965  * \param[in] com       pointer to the lfsck component
966  *
967  * \retval              positive number for file data corruption, the caller
968  *                      should reset the layout LFSCK trace file
969  * \retval              0 for success
970  * \retval              negative error number on failure
971  */
972 static int lfsck_layout_load(const struct lu_env *env,
973                              struct lfsck_component *com)
974 {
975         struct lfsck_layout             *lo     = com->lc_file_ram;
976         ssize_t                          size   = com->lc_file_size;
977         loff_t                           pos    = 0;
978         int                              rc;
979
980         rc = dt_read(env, com->lc_obj,
981                      lfsck_buf_get(env, com->lc_file_disk, size), &pos);
982         if (rc == 0) {
983                 return -ENOENT;
984         } else if (rc < 0) {
985                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
986                        lfsck_lfsck2name(com->lc_lfsck), rc);
987                 return rc;
988         } else if (rc != size) {
989                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
990                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
991                 return 1;
992         }
993
994         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
995         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
996                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
997                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
998                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
999                 return 1;
1000         }
1001
1002         return 0;
1003 }
1004
1005 /**
1006  * Store the layout LFSCK trace file on disk.
1007  *
1008  * The layout LFSCK trace file records the layout LFSCK status information
1009  * and other statistics, such as how many objects have been scanned, and how
1010  * many objects have been repaired, and etc. It also contains the bitmap for
1011  * failed OSTs during the layout LFSCK. All these information will be synced
1012  * from RAM to disk periodically.
1013  *
1014  * \param[in] env       pointer to the thread context
1015  * \param[in] com       pointer to the lfsck component
1016  *
1017  * \retval              0 for success
1018  * \retval              negative error number on failure
1019  */
1020 static int lfsck_layout_store(const struct lu_env *env,
1021                               struct lfsck_component *com)
1022 {
1023         struct dt_object        *obj    = com->lc_obj;
1024         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1025         struct lfsck_layout     *lo_ram = com->lc_file_ram;
1026         struct lfsck_layout     *lo     = com->lc_file_disk;
1027         struct thandle          *th;
1028         struct dt_device        *dev    = lfsck_obj2dev(obj);
1029         struct cfs_bitmap       *bitmap = NULL;
1030         loff_t                   pos;
1031         ssize_t                  size   = com->lc_file_size;
1032         __u32                    nbits  = 0;
1033         int                      rc;
1034         ENTRY;
1035
1036         if (lfsck->li_master) {
1037                 struct lfsck_assistant_data *lad = com->lc_data;
1038
1039                 bitmap = lad->lad_bitmap;
1040                 nbits = bitmap->size;
1041
1042                 LASSERT(nbits > 0);
1043                 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
1044         }
1045
1046         lo_ram->ll_bitmap_size = nbits;
1047         lfsck_layout_cpu_to_le(lo, lo_ram);
1048         th = dt_trans_create(env, dev);
1049         if (IS_ERR(th))
1050                 GOTO(log, rc = PTR_ERR(th));
1051
1052         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
1053                                      (loff_t)0, th);
1054         if (rc != 0)
1055                 GOTO(out, rc);
1056
1057         if (bitmap != NULL) {
1058                 rc = dt_declare_record_write(env, obj,
1059                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
1060                                 (loff_t)size, th);
1061                 if (rc != 0)
1062                         GOTO(out, rc);
1063         }
1064
1065         rc = dt_trans_start_local(env, dev, th);
1066         if (rc != 0)
1067                 GOTO(out, rc);
1068
1069         pos = 0;
1070         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos, th);
1071         if (rc != 0)
1072                 GOTO(out, rc);
1073
1074         if (bitmap != NULL) {
1075                 pos = size;
1076                 rc = dt_record_write(env, obj,
1077                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
1078                                 &pos, th);
1079         }
1080
1081         GOTO(out, rc);
1082
1083 out:
1084         dt_trans_stop(env, dev, th);
1085
1086 log:
1087         if (rc != 0)
1088                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
1089                        lfsck_lfsck2name(lfsck), rc);
1090
1091         return rc;
1092 }
1093
1094 static int lfsck_layout_init(const struct lu_env *env,
1095                              struct lfsck_component *com)
1096 {
1097         struct lfsck_layout *lo = com->lc_file_ram;
1098         int rc;
1099
1100         memset(lo, 0, com->lc_file_size);
1101         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
1102         lo->ll_status = LS_INIT;
1103         down_write(&com->lc_sem);
1104         rc = lfsck_layout_store(env, com);
1105         if (rc == 0 && com->lc_lfsck->li_master)
1106                 rc = lfsck_load_sub_trace_files(env, com,
1107                         &dt_lfsck_layout_dangling_features, LFSCK_LAYOUT, true);
1108         up_write(&com->lc_sem);
1109
1110         return rc;
1111 }
1112
1113 static int fid_is_for_ostobj(const struct lu_env *env,
1114                              struct lfsck_instance *lfsck,
1115                              struct dt_object *obj, const struct lu_fid *fid)
1116 {
1117         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1118         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
1119         struct lustre_ost_attrs *loa;
1120         int                      rc;
1121
1122         fld_range_set_any(range);
1123         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1124         if (rc == 0) {
1125                 if (fld_range_is_ost(range))
1126                         return 1;
1127
1128                 return 0;
1129         }
1130
1131         loa = &lfsck_env_info(env)->lti_loa;
1132         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, loa, sizeof(*loa)),
1133                           XATTR_NAME_LMA);
1134         if (rc >= (int)sizeof(struct lustre_mdt_attrs)) {
1135                 lustre_lma_swab(&loa->loa_lma);
1136
1137                 return loa->loa_lma.lma_compat & LMAC_FID_ON_OST ? 1 : 0;
1138         }
1139
1140         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID);
1141
1142         return rc > 0;
1143 }
1144
1145 static struct lfsck_layout_seq *
1146 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
1147 {
1148         struct lfsck_layout_seq *lls;
1149
1150         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1151                 if (lls->lls_seq == seq)
1152                         return lls;
1153
1154                 if (lls->lls_seq > seq)
1155                         return NULL;
1156         }
1157
1158         return NULL;
1159 }
1160
1161 static void
1162 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
1163                         struct lfsck_layout_seq *lls)
1164 {
1165         struct lfsck_layout_seq *tmp;
1166         struct list_head        *pos = &llsd->llsd_seq_list;
1167
1168         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
1169                 if (lls->lls_seq < tmp->lls_seq) {
1170                         pos = &tmp->lls_list;
1171                         break;
1172                 }
1173         }
1174         list_add_tail(&lls->lls_list, pos);
1175 }
1176
1177 static int
1178 lfsck_layout_lastid_create(const struct lu_env *env,
1179                            struct lfsck_instance *lfsck,
1180                            struct dt_object *obj)
1181 {
1182         struct lfsck_thread_info *info   = lfsck_env_info(env);
1183         struct lu_attr           *la     = &info->lti_la;
1184         struct dt_object_format  *dof    = &info->lti_dof;
1185         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1186         struct dt_device         *dt     = lfsck_obj2dev(obj);
1187         struct thandle           *th;
1188         __u64                     lastid = 0;
1189         loff_t                    pos    = 0;
1190         int                       rc;
1191         ENTRY;
1192
1193         if (bk->lb_param & LPF_DRYRUN)
1194                 return 0;
1195
1196         memset(la, 0, sizeof(*la));
1197         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
1198         la->la_valid = LA_MODE | LA_UID | LA_GID;
1199         memset(dof, 0, sizeof(*dof));
1200         dof->dof_type = dt_mode_to_dft(S_IFREG);
1201
1202         th = dt_trans_create(env, dt);
1203         if (IS_ERR(th))
1204                 GOTO(log, rc = PTR_ERR(th));
1205
1206         rc = dt_declare_create(env, obj, la, NULL, dof, th);
1207         if (rc != 0)
1208                 GOTO(stop, rc);
1209
1210         rc = dt_declare_record_write(env, obj,
1211                                      lfsck_buf_get(env, &lastid,
1212                                                    sizeof(lastid)),
1213                                      pos, th);
1214         if (rc != 0)
1215                 GOTO(stop, rc);
1216
1217         rc = dt_trans_start_local(env, dt, th);
1218         if (rc != 0)
1219                 GOTO(stop, rc);
1220
1221         dt_write_lock(env, obj, 0);
1222         if (likely(dt_object_exists(obj) == 0)) {
1223                 rc = dt_create(env, obj, la, NULL, dof, th);
1224                 if (rc == 0)
1225                         rc = dt_record_write(env, obj,
1226                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
1227                                 &pos, th);
1228         }
1229         dt_write_unlock(env, obj);
1230
1231         GOTO(stop, rc);
1232
1233 stop:
1234         dt_trans_stop(env, dt, th);
1235
1236 log:
1237         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
1238                "%#llx: rc = %d\n",
1239                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
1240
1241         return rc;
1242 }
1243
1244 static int
1245 lfsck_layout_lastid_reload(const struct lu_env *env,
1246                            struct lfsck_component *com,
1247                            struct lfsck_layout_seq *lls)
1248 {
1249         __u64   lastid;
1250         loff_t  pos     = 0;
1251         int     rc;
1252
1253         dt_read_lock(env, lls->lls_lastid_obj, 0);
1254         rc = dt_record_read(env, lls->lls_lastid_obj,
1255                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
1256         dt_read_unlock(env, lls->lls_lastid_obj);
1257         if (unlikely(rc != 0))
1258                 return rc;
1259
1260         lastid = le64_to_cpu(lastid);
1261         if (lastid < lls->lls_lastid_known) {
1262                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
1263                 struct lfsck_layout     *lo     = com->lc_file_ram;
1264
1265                 lls->lls_lastid = lls->lls_lastid_known;
1266                 lls->lls_dirty = 1;
1267                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1268                         LASSERT(lfsck->li_out_notify != NULL);
1269
1270                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1271                                              LE_LASTID_REBUILDING);
1272                         lo->ll_flags |= LF_CRASHED_LASTID;
1273
1274                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
1275                                "LAST_ID file (1) for the sequence %#llx"
1276                                ", old value %llu, known value %llu\n",
1277                                lfsck_lfsck2name(lfsck), lls->lls_seq,
1278                                lastid, lls->lls_lastid);
1279                 }
1280         } else if (lastid >= lls->lls_lastid) {
1281                 lls->lls_lastid = lastid;
1282                 lls->lls_dirty = 0;
1283         }
1284
1285         return 0;
1286 }
1287
1288 static int
1289 lfsck_layout_lastid_store(const struct lu_env *env,
1290                           struct lfsck_component *com)
1291 {
1292         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1293         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1294         struct dt_device                *dt     = lfsck->li_bottom;
1295         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1296         struct lfsck_layout_seq         *lls;
1297         struct thandle                  *th;
1298         __u64                            lastid;
1299         int                              rc     = 0;
1300         int                              rc1    = 0;
1301
1302         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1303                 loff_t pos = 0;
1304
1305                 if (!lls->lls_dirty)
1306                         continue;
1307
1308                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1309                        "<seq> %#llx as <oid> %llu\n",
1310                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1311
1312                 if (bk->lb_param & LPF_DRYRUN) {
1313                         lls->lls_dirty = 0;
1314                         continue;
1315                 }
1316
1317                 th = dt_trans_create(env, dt);
1318                 if (IS_ERR(th)) {
1319                         rc1 = PTR_ERR(th);
1320                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1321                                "the LAST_ID for <seq> %#llx(1): rc = %d\n",
1322                                lfsck_lfsck2name(com->lc_lfsck),
1323                                lls->lls_seq, rc1);
1324                         continue;
1325                 }
1326
1327                 lastid = cpu_to_le64(lls->lls_lastid);
1328                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1329                                              lfsck_buf_get(env, &lastid,
1330                                                            sizeof(lastid)),
1331                                              pos, th);
1332                 if (rc != 0)
1333                         goto stop;
1334
1335                 rc = dt_trans_start_local(env, dt, th);
1336                 if (rc != 0)
1337                         goto stop;
1338
1339                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1340                 rc = dt_record_write(env, lls->lls_lastid_obj,
1341                                      lfsck_buf_get(env, &lastid,
1342                                      sizeof(lastid)), &pos, th);
1343                 dt_write_unlock(env, lls->lls_lastid_obj);
1344                 if (rc == 0)
1345                         lls->lls_dirty = 0;
1346
1347 stop:
1348                 dt_trans_stop(env, dt, th);
1349                 if (rc != 0) {
1350                         rc1 = rc;
1351                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1352                                "the LAST_ID for <seq> %#llx(2): rc = %d\n",
1353                                lfsck_lfsck2name(com->lc_lfsck),
1354                                lls->lls_seq, rc1);
1355                 }
1356         }
1357
1358         return rc1;
1359 }
1360
1361 static int
1362 lfsck_layout_lastid_load(const struct lu_env *env,
1363                          struct lfsck_component *com,
1364                          struct lfsck_layout_seq *lls)
1365 {
1366         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1367         struct lfsck_layout     *lo     = com->lc_file_ram;
1368         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1369         struct dt_object        *obj;
1370         loff_t                   pos    = 0;
1371         int                      rc;
1372         ENTRY;
1373
1374         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck));
1375         obj = dt_locate(env, lfsck->li_bottom, fid);
1376         if (IS_ERR(obj))
1377                 RETURN(PTR_ERR(obj));
1378
1379         /* LAST_ID crashed, to be rebuilt */
1380         if (dt_object_exists(obj) == 0) {
1381                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1382                         LASSERT(lfsck->li_out_notify != NULL);
1383
1384                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1385                                              LE_LASTID_REBUILDING);
1386                         lo->ll_flags |= LF_CRASHED_LASTID;
1387
1388                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1389                                "LAST_ID file for sequence %#llx\n",
1390                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1391
1392                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1393                             cfs_fail_val > 0) {
1394                                 struct l_wait_info lwi = LWI_TIMEOUT(
1395                                                 cfs_time_seconds(cfs_fail_val),
1396                                                 NULL, NULL);
1397
1398                                 /* Some others may changed the cfs_fail_val
1399                                  * as zero after above check, re-check it for
1400                                  * sure to avoid falling into wait for ever. */
1401                                 if (likely(lwi.lwi_timeout > 0)) {
1402                                         struct ptlrpc_thread *thread =
1403                                                 &lfsck->li_thread;
1404
1405                                         up_write(&com->lc_sem);
1406                                         l_wait_event(thread->t_ctl_waitq,
1407                                                      !thread_is_running(thread),
1408                                                      &lwi);
1409                                         down_write(&com->lc_sem);
1410                                 }
1411                         }
1412                 }
1413
1414                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1415         } else {
1416                 dt_read_lock(env, obj, 0);
1417                 rc = dt_read(env, obj,
1418                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1419                         &pos);
1420                 dt_read_unlock(env, obj);
1421                 if (rc != 0 && rc != sizeof(__u64))
1422                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1423
1424                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1425                         LASSERT(lfsck->li_out_notify != NULL);
1426
1427                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1428                                              LE_LASTID_REBUILDING);
1429                         lo->ll_flags |= LF_CRASHED_LASTID;
1430
1431                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1432                                "LAST_ID file for the sequence %#llx"
1433                                ": rc = %d\n",
1434                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1435                 }
1436
1437                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1438                 rc = 0;
1439         }
1440
1441         GOTO(out, rc);
1442
1443 out:
1444         if (rc != 0)
1445                 lfsck_object_put(env, obj);
1446         else
1447                 lls->lls_lastid_obj = obj;
1448
1449         return rc;
1450 }
1451
1452 static void lfsck_layout_record_failure(const struct lu_env *env,
1453                                         struct lfsck_instance *lfsck,
1454                                         struct lfsck_layout *lo)
1455 {
1456         __u64 cookie;
1457
1458         lo->ll_objs_failed_phase1++;
1459         cookie = lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1460                                                         lfsck->li_di_oit);
1461         if (lo->ll_pos_first_inconsistent == 0 ||
1462             lo->ll_pos_first_inconsistent < cookie) {
1463                 lo->ll_pos_first_inconsistent = cookie;
1464
1465                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1466                        "inconsistency at the pos [%llu]\n",
1467                        lfsck_lfsck2name(lfsck),
1468                        lo->ll_pos_first_inconsistent);
1469         }
1470 }
1471
1472 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1473                                            struct lfsck_component *com,
1474                                            int rc)
1475 {
1476         struct lfsck_instance   *lfsck = com->lc_lfsck;
1477         struct lfsck_layout     *lo    = com->lc_file_ram;
1478
1479         CDEBUG(D_LFSCK, "%s: layout LFSCK double scan: rc = %d\n",
1480                lfsck_lfsck2name(lfsck), rc);
1481
1482         down_write(&com->lc_sem);
1483         lo->ll_run_time_phase2 += ktime_get_seconds() -
1484                                   com->lc_time_last_checkpoint;
1485         lo->ll_time_last_checkpoint = ktime_get_real_seconds();
1486         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1487
1488         if (rc > 0) {
1489                 if (lo->ll_flags & LF_INCOMPLETE) {
1490                         lo->ll_status = LS_PARTIAL;
1491                 } else {
1492                         if (lfsck->li_master) {
1493                                 struct lfsck_assistant_data *lad = com->lc_data;
1494
1495                                 if (test_bit(LAD_INCOMPLETE, &lad->lad_flags))
1496                                         lo->ll_status = LS_PARTIAL;
1497                                 else
1498                                         lo->ll_status = LS_COMPLETED;
1499                         } else {
1500                                 lo->ll_status = LS_COMPLETED;
1501                         }
1502                 }
1503                 lo->ll_flags &= ~LF_SCANNED_ONCE;
1504                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
1505                         lo->ll_flags &= ~LF_INCONSISTENT;
1506                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1507                 lo->ll_success_count++;
1508         } else if (rc == 0) {
1509                 if (lfsck->li_status != 0)
1510                         lo->ll_status = lfsck->li_status;
1511                 else
1512                         lo->ll_status = LS_STOPPED;
1513         } else {
1514                 lo->ll_status = LS_FAILED;
1515         }
1516
1517         rc = lfsck_layout_store(env, com);
1518         up_write(&com->lc_sem);
1519
1520         CDEBUG(D_LFSCK, "%s: layout LFSCK double scan result %u: rc = %d\n",
1521                lfsck_lfsck2name(lfsck), lo->ll_status, rc);
1522
1523         return rc;
1524 }
1525
1526 static int lfsck_layout_trans_stop(const struct lu_env *env,
1527                                    struct dt_device *dev,
1528                                    struct thandle *handle, int result)
1529 {
1530         int rc;
1531
1532         /* XXX: If there is something worng or it needs to repair nothing,
1533          *      then notify the lower to stop the modification. Currently,
1534          *      we use th_result for such purpose, that may be replaced by
1535          *      some rollback mechanism in the future. */
1536         handle->th_result = result;
1537         rc = dt_trans_stop(env, dev, handle);
1538         if (result != 0)
1539                 return result > 0 ? 0 : result;
1540
1541         return rc == 0 ? 1 : rc;
1542 }
1543
1544 static int lfsck_layout_ins_dangling_rec(const struct lu_env *env,
1545                                          struct lfsck_component *com,
1546                                          const struct lu_fid *pfid,
1547                                          const struct lu_fid *cfid,
1548                                          __u32 comp_id, __u32 ea_off,
1549                                          __u32 ost_idx)
1550 {
1551         struct lfsck_layout_dangling_key *key = &lfsck_env_info(env)->lti_lldk;
1552         struct lu_fid *rec = &lfsck_env_info(env)->lti_fid3;
1553         struct dt_device *dev;
1554         struct dt_object *obj;
1555         struct thandle *th = NULL;
1556         int idx;
1557         int rc = 0;
1558         ENTRY;
1559
1560         idx = lfsck_sub_trace_file_fid2idx(pfid);
1561         obj = com->lc_sub_trace_objs[idx].lsto_obj;
1562         dev = lfsck_obj2dev(obj);
1563
1564         fid_cpu_to_be(&key->lldk_fid, pfid);
1565         key->lldk_comp_id = cpu_to_be32(comp_id);
1566         key->lldk_ea_off = cpu_to_be32(ea_off);
1567
1568         fid_cpu_to_be(rec, cfid);
1569         rec->f_ver = cpu_to_be32(ost_idx);
1570
1571         mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1572
1573         th = dt_trans_create(env, dev);
1574         if (IS_ERR(th))
1575                 GOTO(unlock, rc = PTR_ERR(th));
1576
1577         rc = dt_declare_insert(env, obj,
1578                                (const struct dt_rec *)rec,
1579                                (const struct dt_key *)key, th);
1580         if (rc)
1581                 GOTO(unlock, rc);
1582
1583         rc = dt_trans_start_local(env, dev, th);
1584         if (rc)
1585                 GOTO(unlock, rc);
1586
1587         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
1588                        (const struct dt_key *)key, th);
1589
1590         GOTO(unlock, rc);
1591
1592 unlock:
1593         if (th && !IS_ERR(th))
1594                 dt_trans_stop(env, dev, th);
1595
1596         mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1597
1598         CDEBUG(D_LFSCK, "%s: insert the paris "DFID" => "DFID", comp_id = %u, "
1599                "ea_off = %u, ost_idx = %u, into the trace file for further "
1600                "dangling check: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1601                PFID(pfid), PFID(cfid), comp_id, ea_off, ost_idx, rc);
1602
1603         return rc;
1604 }
1605
1606 static int lfsck_layout_del_dangling_rec(const struct lu_env *env,
1607                                          struct lfsck_component *com,
1608                                          const struct lu_fid *fid,
1609                                          __u32 comp_id, __u32 ea_off)
1610 {
1611         struct lfsck_layout_dangling_key *key = &lfsck_env_info(env)->lti_lldk;
1612         struct dt_device *dev;
1613         struct dt_object *obj;
1614         struct thandle *th = NULL;
1615         int idx;
1616         int rc = 0;
1617         ENTRY;
1618
1619         idx = lfsck_sub_trace_file_fid2idx(fid);
1620         obj = com->lc_sub_trace_objs[idx].lsto_obj;
1621         dev = lfsck_obj2dev(obj);
1622
1623         fid_cpu_to_be(&key->lldk_fid, fid);
1624         key->lldk_comp_id = cpu_to_be32(comp_id);
1625         key->lldk_ea_off = cpu_to_be32(ea_off);
1626
1627         mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1628
1629         th = dt_trans_create(env, dev);
1630         if (IS_ERR(th))
1631                 GOTO(unlock, rc = PTR_ERR(th));
1632
1633         rc = dt_declare_delete(env, obj, (const struct dt_key *)key, th);
1634         if (rc)
1635                 GOTO(unlock, rc);
1636
1637         rc = dt_trans_start_local(env, dev, th);
1638         if (rc)
1639                 GOTO(unlock, rc);
1640
1641         rc = dt_delete(env, obj, (const struct dt_key *)key, th);
1642
1643         GOTO(unlock, rc);
1644
1645 unlock:
1646         if (th && !IS_ERR(th))
1647                 dt_trans_stop(env, dev, th);
1648
1649         mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1650
1651         CDEBUG(D_LFSCK, "%s: delete the dangling record for "DFID
1652                ", comp_id = %u, ea_off = %u from the trace file: rc = %d\n",
1653                lfsck_lfsck2name(com->lc_lfsck), PFID(fid), comp_id, ea_off, rc);
1654
1655         return rc;
1656 }
1657
1658 /**
1659  * Get the system default stripe size.
1660  *
1661  * \param[in] env       pointer to the thread context
1662  * \param[in] lfsck     pointer to the lfsck instance
1663  * \param[out] size     pointer to the default stripe size
1664  *
1665  * \retval              0 for success
1666  * \retval              negative error number on failure
1667  */
1668 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1669                                            struct lfsck_instance *lfsck,
1670                                            __u32 *size)
1671 {
1672         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1673         struct dt_object        *root;
1674         int                      rc;
1675
1676         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1677         if (IS_ERR(root))
1678                 return PTR_ERR(root);
1679
1680         /* Get the default stripe size via xattr_get on the backend root. */
1681         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1682                           XATTR_NAME_LOV);
1683         if (rc > 0) {
1684                 /* The lum->lmm_stripe_size is LE mode. The *size also
1685                  * should be LE mode. So it is unnecessary to convert. */
1686                 *size = lum->lmm_stripe_size;
1687                 rc = 0;
1688         } else if (unlikely(rc == 0)) {
1689                 rc = -EINVAL;
1690         }
1691
1692         lfsck_object_put(env, root);
1693
1694         return rc;
1695 }
1696
1697 /**
1698  * \retval       +1: repaired
1699  * \retval        0: did nothing
1700  * \retval      -ve: on error
1701  */
1702 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1703                                      struct lfsck_instance *lfsck,
1704                                      struct thandle *handle,
1705                                      struct dt_object *parent,
1706                                      const struct lu_fid *cfid,
1707                                      struct lu_buf *buf,
1708                                      struct lov_mds_md_v1 *lmm,
1709                                      struct lov_ost_data_v1 *slot,
1710                                      int fl, __u32 ost_idx, int size)
1711 {
1712         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1713         struct lu_buf            ea_buf;
1714         int                      rc;
1715         __u32                    magic;
1716         __u32                    pattern;
1717         __u16                    count;
1718         ENTRY;
1719
1720         magic = le32_to_cpu(lmm->lmm_magic);
1721         pattern = le32_to_cpu(lmm->lmm_pattern);
1722         count = le16_to_cpu(lmm->lmm_stripe_count);
1723
1724         fid_to_ostid(cfid, oi);
1725         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1726         slot->l_ost_gen = cpu_to_le32(0);
1727         slot->l_ost_idx = cpu_to_le32(ost_idx);
1728
1729         if (pattern & LOV_PATTERN_F_HOLE) {
1730                 struct lov_ost_data_v1 *objs;
1731                 int                     i;
1732
1733                 if (magic == LOV_MAGIC_V1)
1734                         objs = &lmm->lmm_objects[0];
1735                 else
1736                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1737                 for (i = 0; i < count; i++, objs++) {
1738                         if (lovea_slot_is_dummy(objs))
1739                                 break;
1740                 }
1741
1742                 /* If the @slot is the last dummy slot to be refilled,
1743                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1744                 if (i == count) {
1745                         lmm->lmm_pattern =
1746                                 cpu_to_le32(pattern & ~LOV_PATTERN_F_HOLE);
1747
1748                         CDEBUG(D_LFSCK, "%s: remove layout HOLE for "DFID
1749                                ": parent "DFID"\n", lfsck_lfsck2name(lfsck),
1750                                PFID(cfid), PFID(lfsck_dto2fid(parent)));
1751                 }
1752         }
1753
1754         lfsck_buf_init(&ea_buf, buf->lb_buf, size);
1755         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle);
1756         if (rc == 0)
1757                 rc = 1;
1758
1759         RETURN(rc);
1760 }
1761
1762 static struct lov_ost_data_v1 *
1763 __lfsck_layout_new_v1_lovea(struct lov_mds_md_v1 *lmm,
1764                             const struct lu_fid *pfid,
1765                             __u32 stripe_size, __u32 ea_off,
1766                             __u32 pattern, __u16 count)
1767 {
1768         lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1769         lmm->lmm_pattern = cpu_to_le32(pattern);
1770         fid_to_lmm_oi(pfid, &lmm->lmm_oi);
1771         lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1772         lmm->lmm_stripe_size = cpu_to_le32(stripe_size);
1773         lmm->lmm_stripe_count = cpu_to_le16(count);
1774         lmm->lmm_layout_gen = cpu_to_le16(1);
1775         memset(&lmm->lmm_objects[0], 0,
1776                sizeof(struct lov_ost_data_v1) * count);
1777
1778         return &lmm->lmm_objects[ea_off];
1779 }
1780
1781 static int lfsck_layout_new_v1_lovea(const struct lu_env *env,
1782                                      struct lfsck_instance *lfsck,
1783                                      struct ost_layout *ol,
1784                                      struct dt_object *parent,
1785                                      struct lu_buf *buf, __u32 ea_off,
1786                                      struct lov_mds_md_v1 **lmm,
1787                                      struct lov_ost_data_v1 **objs)
1788 {
1789         int size;
1790         __u32 stripe_size = ol->ol_stripe_size;
1791         __u32 pattern = LOV_PATTERN_RAID0;
1792         __u16 count;
1793
1794         if (ol->ol_stripe_count != 0)
1795                 count = ol->ol_stripe_count;
1796         else
1797                 count = ea_off + 1;
1798
1799         size = lov_mds_md_size(count, LOV_MAGIC_V1);
1800         LASSERTF(buf->lb_len >= size,
1801                  "buffer len %d is less than real size %d\n",
1802                  (int)buf->lb_len, size);
1803
1804         if (stripe_size == 0) {
1805                 int rc;
1806
1807                 rc = lfsck_layout_get_def_stripesize(env, lfsck, &stripe_size);
1808                 if (rc)
1809                         return rc;
1810         }
1811
1812         *lmm = buf->lb_buf;
1813         if (ol->ol_stripe_count > 1 ||
1814             (ol->ol_stripe_count == 0 && ea_off != 0)) {
1815                 pattern |= LOV_PATTERN_F_HOLE;
1816                 memset(&(*lmm)->lmm_objects[0], 0,
1817                        count * sizeof(struct lov_ost_data_v1));
1818         }
1819
1820         *objs = __lfsck_layout_new_v1_lovea(*lmm, lfsck_dto2fid(parent),
1821                                 stripe_size, ea_off, pattern, count);
1822
1823         return size;
1824 }
1825
1826 static int lfsck_layout_new_comp_lovea(const struct lu_env *env,
1827                                        struct lu_orphan_rec_v3 *rec,
1828                                        struct dt_object *parent,
1829                                        struct lu_buf *buf, __u32 ea_off,
1830                                        struct lov_mds_md_v1 **lmm,
1831                                        struct lov_ost_data_v1 **objs)
1832 {
1833         struct ost_layout *ol = &rec->lor_layout;
1834         struct lov_comp_md_v1 *lcm;
1835         struct lov_comp_md_entry_v1 *lcme;
1836         __u32 pattern = LOV_PATTERN_RAID0;
1837         __u32 offset = sizeof(*lcm) + sizeof(*lcme);
1838         int lcme_size = lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
1839         int size = offset + lcme_size;
1840
1841         LASSERTF(buf->lb_len >= size,
1842                  "buffer len %d is less than real size %d\n",
1843                  (int)buf->lb_len, size);
1844
1845         lcm = buf->lb_buf;
1846         lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
1847         lcm->lcm_size = cpu_to_le32(size);
1848         if (rec->lor_range) {
1849                 lcm->lcm_layout_gen = cpu_to_le32(rec->lor_layout_version +
1850                                                   rec->lor_range);
1851                 lcm->lcm_flags = cpu_to_le16(LCM_FL_WRITE_PENDING);
1852         } else if (rec->lor_layout_version) {
1853                 lcm->lcm_layout_gen = cpu_to_le32(rec->lor_layout_version +
1854                                                   rec->lor_range);
1855                 lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE);
1856         } else {
1857                 lcm->lcm_layout_gen = cpu_to_le32(1);
1858                 lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE);
1859         }
1860         lcm->lcm_entry_count = cpu_to_le16(1);
1861         /* Currently, we do not know how many mirrors will be, set it as zero
1862          * at the beginning. It will be updated when more mirrors are found. */
1863         lcm->lcm_mirror_count = 0;
1864
1865         lcme = &lcm->lcm_entries[0];
1866         lcme->lcme_id = cpu_to_le32(ol->ol_comp_id);
1867         lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT);
1868         lcme->lcme_extent.e_start = cpu_to_le64(ol->ol_comp_start);
1869         lcme->lcme_extent.e_end = cpu_to_le64(ol->ol_comp_end);
1870         lcme->lcme_offset = cpu_to_le32(offset);
1871         lcme->lcme_size = cpu_to_le32(lcme_size);
1872         lcme->lcme_layout_gen = lcm->lcm_layout_gen;
1873         if (ol->ol_stripe_count > 1)
1874                 pattern |= LOV_PATTERN_F_HOLE;
1875
1876         *lmm = buf->lb_buf + offset;
1877         *objs = __lfsck_layout_new_v1_lovea(*lmm, lfsck_dto2fid(parent),
1878                                             ol->ol_stripe_size, ea_off,
1879                                             pattern, ol->ol_stripe_count);
1880
1881         return size;
1882 }
1883
1884 static void lfsck_layout_update_lcm(struct lov_comp_md_v1 *lcm,
1885                                     struct lov_comp_md_entry_v1 *lcme,
1886                                     __u32 version, __u32 range)
1887 {
1888         struct lov_comp_md_entry_v1 *tmp;
1889         __u64 start = le64_to_cpu(lcme->lcme_extent.e_start);
1890         __u64 end = le64_to_cpu(lcme->lcme_extent.e_end);
1891         __u32 gen = version + range;
1892         __u32 tmp_gen;
1893         int i;
1894         __u16 count = le16_to_cpu(lcm->lcm_entry_count);
1895         __u16 flags = le16_to_cpu(lcm->lcm_flags);
1896
1897         if (!gen)
1898                 gen = 1;
1899         lcme->lcme_layout_gen = cpu_to_le32(gen);
1900         if (le32_to_cpu(lcm->lcm_layout_gen) < gen)
1901                 lcm->lcm_layout_gen = cpu_to_le32(gen);
1902
1903         if (range)
1904                 lcm->lcm_flags = cpu_to_le16(LCM_FL_WRITE_PENDING);
1905         else if (flags == LCM_FL_NONE && le16_to_cpu(lcm->lcm_mirror_count) > 0)
1906                 lcm->lcm_flags = cpu_to_le16(LCM_FL_RDONLY);
1907
1908         for (i = 0; i < count; i++) {
1909                 tmp = &lcm->lcm_entries[i];
1910                 if (le64_to_cpu(tmp->lcme_extent.e_end) <= start)
1911                         continue;
1912
1913                 if (le64_to_cpu(tmp->lcme_extent.e_start) >= end)
1914                         continue;
1915
1916                 if (le32_to_cpu(tmp->lcme_flags) & LCME_FL_STALE)
1917                         continue;
1918
1919                 tmp_gen = le32_to_cpu(tmp->lcme_layout_gen);
1920                 /* "lcme_layout_gen == 0" but without LCME_FL_STALE flag,
1921                  * then it should be the latest version of all mirrors. */
1922                 if (tmp_gen == 0 || tmp_gen > gen) {
1923                         lcme->lcme_flags = cpu_to_le32(
1924                                 le32_to_cpu(lcme->lcme_flags) | LCME_FL_STALE);
1925                         break;
1926                 }
1927
1928                 if (tmp_gen < gen)
1929                         tmp->lcme_flags = cpu_to_le32(
1930                                 le32_to_cpu(tmp->lcme_flags) | LCME_FL_STALE);
1931         }
1932 }
1933
1934 static int lfsck_layout_add_comp(const struct lu_env *env,
1935                                  struct lfsck_instance *lfsck,
1936                                  struct thandle *handle,
1937                                  struct lu_orphan_rec_v3 *rec,
1938                                  struct dt_object *parent,
1939                                  const struct lu_fid *cfid,
1940                                  struct lu_buf *buf, __u32 ost_idx,
1941                                  __u32 ea_off, int pos, bool new_mirror)
1942 {
1943         struct ost_layout *ol = &rec->lor_layout;
1944         struct lov_comp_md_v1 *lcm = buf->lb_buf;
1945         struct lov_comp_md_entry_v1 *lcme;
1946         struct lov_mds_md_v1 *lmm;
1947         struct lov_ost_data_v1 *objs;
1948         int added = sizeof(*lcme) +
1949                     lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
1950         int size = le32_to_cpu(lcm->lcm_size) + added;
1951         int rc;
1952         int i;
1953         __u32 offset;
1954         __u32 pattern = LOV_PATTERN_RAID0;
1955         __u16 count = le16_to_cpu(lcm->lcm_entry_count);
1956         ENTRY;
1957
1958         lu_buf_check_and_grow(buf, size);
1959         /* set the lcm again because lu_buf_check_and_grow() may
1960          * have reallocated the buf. */
1961         lcm = buf->lb_buf;
1962         lcm->lcm_size = cpu_to_le32(size);
1963         lcm->lcm_entry_count = cpu_to_le16(count + 1);
1964         if (new_mirror)
1965                 le16_add_cpu(&lcm->lcm_mirror_count, 1);
1966
1967         /* 1. Move the component bodies from [pos, count-1] to [pos+1, count]
1968          *    with distance of 'added'. */
1969         if (pos < count) {
1970                 size = 0;
1971                 for (i = pos; i < count; i++) {
1972                         lcme = &lcm->lcm_entries[i];
1973                         size += le32_to_cpu(lcme->lcme_size);
1974                 }
1975
1976                 offset = le32_to_cpu(lcm->lcm_entries[pos].lcme_offset);
1977                 memmove(buf->lb_buf + offset + added,
1978                         buf->lb_buf + offset, size);
1979         }
1980
1981         size = 0;
1982         /* 2. Move the component header [0, pos-1] to [0, pos-1] with distance
1983          *    of 'sizeof(struct lov_comp_md_entry_v1)' */
1984         if (pos > 0) {
1985                 for (i = 0; i < pos; i++) {
1986                         lcme = &lcm->lcm_entries[i];
1987                         size += le32_to_cpu(lcme->lcme_size);
1988                 }
1989
1990                 offset = le32_to_cpu(lcm->lcm_entries[0].lcme_offset);
1991                 memmove(buf->lb_buf + offset + sizeof(*lcme),
1992                         buf->lb_buf + offset, size);
1993         }
1994
1995         /* 3. Recalculate the enter offset for the component [pos, count-1] */
1996         for (i = count - 1; i >= pos; i--) {
1997                 lcm->lcm_entries[i + 1] = lcm->lcm_entries[i];
1998                 lcm->lcm_entries[i + 1].lcme_offset =
1999                         cpu_to_le32(le32_to_cpu(lcm->lcm_entries[i + 1].
2000                                                 lcme_offset) + added);
2001         }
2002
2003         /* 4. Recalculate the enter offset for the component [0, pos) */
2004         for (i = 0; i < pos; i++) {
2005                 lcm->lcm_entries[i].lcme_offset =
2006                         cpu_to_le32(le32_to_cpu(lcm->lcm_entries[i].
2007                                                 lcme_offset) + sizeof(*lcme));
2008         }
2009
2010         offset = sizeof(*lcm) + sizeof(*lcme) * (count + 1) + size;
2011         /* 4. Insert the new component header (entry) at the slot 'pos'. */
2012         lcme = &lcm->lcm_entries[pos];
2013         lcme->lcme_id = cpu_to_le32(ol->ol_comp_id);
2014         lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT);
2015         lcme->lcme_extent.e_start = cpu_to_le64(ol->ol_comp_start);
2016         lcme->lcme_extent.e_end = cpu_to_le64(ol->ol_comp_end);
2017         lcme->lcme_offset = cpu_to_le32(offset);
2018         lcme->lcme_size = cpu_to_le32(lov_mds_md_size(ol->ol_stripe_count,
2019                                                       LOV_MAGIC_V1));
2020
2021         if (ol->ol_stripe_count > 1)
2022                 pattern |= LOV_PATTERN_F_HOLE;
2023
2024         lmm = buf->lb_buf + offset;
2025         /* 5. Insert teh new component body at the 'offset'. */
2026         objs = __lfsck_layout_new_v1_lovea(lmm, lfsck_dto2fid(parent),
2027                                            ol->ol_stripe_size, ea_off,
2028                                            pattern, ol->ol_stripe_count);
2029
2030         /* 6. Update mirror related flags and version. */
2031         lfsck_layout_update_lcm(lcm, lcme, rec->lor_layout_version,
2032                                 rec->lor_range);
2033
2034         rc = lfsck_layout_refill_lovea(env, lfsck, handle, parent, cfid, buf,
2035                                        lmm, objs, LU_XATTR_REPLACE, ost_idx,
2036                                        le32_to_cpu(lcm->lcm_size));
2037
2038         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant add new COMP for "
2039                DFID": parent "DFID", OST-index %u, stripe-index %u, "
2040                "stripe_size %u, stripe_count %u, comp_id %u, comp_start %llu, "
2041                "comp_end %llu, layout version %u, range %u, "
2042                "%s LOV EA hole: rc = %d\n",
2043                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
2044                ost_idx, ea_off, ol->ol_stripe_size, ol->ol_stripe_count,
2045                ol->ol_comp_id, ol->ol_comp_start, ol->ol_comp_end,
2046                rec->lor_layout_version, rec->lor_range,
2047                le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE ?
2048                "with" : "without", rc);
2049
2050         RETURN(rc);
2051 }
2052
2053 static int lfsck_layout_extend_v1v3_lovea(const struct lu_env *env,
2054                                           struct lfsck_instance *lfsck,
2055                                           struct thandle *handle,
2056                                           struct ost_layout *ol,
2057                                           struct dt_object *parent,
2058                                           const struct lu_fid *cfid,
2059                                           struct lu_buf *buf, __u32 ost_idx,
2060                                           __u32 ea_off)
2061 {
2062         struct lov_mds_md_v1 *lmm = buf->lb_buf;
2063         struct lov_ost_data_v1 *objs;
2064         __u16 count = le16_to_cpu(lmm->lmm_stripe_count);
2065         __u32 magic = le32_to_cpu(lmm->lmm_magic);
2066         int size;
2067         int gap;
2068         int rc;
2069         ENTRY;
2070
2071         /* The original LOVEA maybe re-generated via old filter_fid, at
2072          * that time, we do not know the stripe count and stripe size. */
2073         if (ol->ol_stripe_count > count)
2074                 count = ol->ol_stripe_count;
2075         if (ol->ol_stripe_size != 0 &&
2076             ol->ol_stripe_size != le32_to_cpu(lmm->lmm_stripe_size))
2077                 lmm->lmm_stripe_size = cpu_to_le32(ol->ol_stripe_size);
2078
2079         if (magic == LOV_MAGIC_V1)
2080                 objs = &lmm->lmm_objects[count];
2081         else
2082                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[count];
2083
2084         gap = ea_off - count;
2085         if (gap >= 0)
2086                 count = ea_off + 1;
2087
2088         size = lov_mds_md_size(count, magic);
2089         LASSERTF(buf->lb_len >= size,
2090                  "buffer len %d is less than real size %d\n",
2091                  (int)buf->lb_len, size);
2092
2093         if (gap > 0) {
2094                 memset(objs, 0, gap * sizeof(*objs));
2095                 lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
2096         }
2097
2098         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2099         lmm->lmm_stripe_count = cpu_to_le16(count);
2100         objs += gap;
2101
2102         rc = lfsck_layout_refill_lovea(env, lfsck, handle, parent, cfid, buf,
2103                                 lmm, objs, LU_XATTR_REPLACE, ost_idx, size);
2104
2105         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
2106                DFID": parent "DFID", OST-index %u, stripe-index %u, "
2107                "stripe_size %u, stripe_count %u, comp_id %u, comp_start %llu, "
2108                "comp_end %llu, %s LOV EA hole: rc = %d\n",
2109                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
2110                ost_idx, ea_off, ol->ol_stripe_size, ol->ol_stripe_count,
2111                ol->ol_comp_id, ol->ol_comp_start, ol->ol_comp_end,
2112                le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE ?
2113                "with" : "without", rc);
2114
2115         RETURN(rc);
2116 }
2117
2118 /**
2119  * \retval       +1: repaired
2120  * \retval        0: did nothing
2121  * \retval      -ve: on error
2122  */
2123 static int lfsck_layout_update_lovea(const struct lu_env *env,
2124                                      struct lfsck_instance *lfsck,
2125                                      struct thandle *handle,
2126                                      struct lu_orphan_rec_v3 *rec,
2127                                      struct dt_object *parent,
2128                                      const struct lu_fid *cfid,
2129                                      struct lu_buf *buf, int fl,
2130                                      __u32 ost_idx, __u32 ea_off)
2131 {
2132         struct ost_layout *ol = &rec->lor_layout;
2133         struct lov_mds_md_v1 *lmm = NULL;
2134         struct lov_ost_data_v1 *objs = NULL;
2135         int rc = 0;
2136         ENTRY;
2137
2138         if (ol->ol_comp_id != 0)
2139                 rc = lfsck_layout_new_comp_lovea(env, rec, parent, buf, ea_off,
2140                                                  &lmm, &objs);
2141         else
2142                 rc = lfsck_layout_new_v1_lovea(env, lfsck, &rec->lor_layout,
2143                                                parent, buf, ea_off, &lmm,
2144                                                &objs);
2145         if (rc > 0)
2146                 rc = lfsck_layout_refill_lovea(env, lfsck, handle, parent, cfid,
2147                                                buf, lmm, objs, fl, ost_idx, rc);
2148
2149         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant created layout EA for "
2150                DFID": parent "DFID", OST-index %u, stripe-index %u, "
2151                "stripe_size %u, stripe_count %u, comp_id %u, comp_start %llu, "
2152                "comp_end %llu, layout version %u, range %u, fl %d, "
2153                "%s LOV EA hole: rc = %d\n",
2154                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
2155                ost_idx, ea_off, ol->ol_stripe_size, ol->ol_stripe_count,
2156                ol->ol_comp_id, ol->ol_comp_start, ol->ol_comp_end,
2157                rec->lor_layout_version, rec->lor_range, fl,
2158                le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE ?
2159                "with" : "without", rc);
2160
2161         RETURN(rc);
2162 }
2163
2164 static int __lfsck_layout_update_pfid(const struct lu_env *env,
2165                                       struct dt_object *child,
2166                                       const struct lu_fid *pfid,
2167                                       const struct ost_layout *ol, __u32 offset,
2168                                       __u32 version, __u32 range)
2169 {
2170         struct dt_device        *dev    = lfsck_obj2dev(child);
2171         struct filter_fid       *ff     = &lfsck_env_info(env)->lti_ff;
2172         struct thandle          *handle;
2173         struct lu_buf            buf    = { NULL };
2174         int                      rc;
2175
2176         ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
2177         ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
2178         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2179          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2180          * parent MDT-object's layout EA. */
2181         ff->ff_parent.f_stripe_idx = cpu_to_le32(offset);
2182         ost_layout_cpu_to_le(&ff->ff_layout, ol);
2183         ff->ff_layout_version = cpu_to_le32(version);
2184         ff->ff_range = cpu_to_le32(range);
2185         lfsck_buf_init(&buf, ff, sizeof(*ff));
2186
2187         handle = dt_trans_create(env, dev);
2188         if (IS_ERR(handle))
2189                 RETURN(PTR_ERR(handle));
2190
2191         rc = dt_declare_xattr_set(env, child, &buf, XATTR_NAME_FID, 0, handle);
2192         if (rc != 0)
2193                 GOTO(stop, rc);
2194
2195         rc = dt_trans_start_local(env, dev, handle);
2196         if (rc != 0)
2197                 GOTO(stop, rc);
2198
2199         rc = dt_xattr_set(env, child, &buf, XATTR_NAME_FID, 0, handle);
2200
2201         GOTO(stop, rc);
2202
2203 stop:
2204         dt_trans_stop(env, dev, handle);
2205
2206         return rc;
2207 }
2208
2209 /**
2210  * \retval       +1: repaired
2211  * \retval        0: did nothing
2212  * \retval      -ve: on error
2213  */
2214 static int lfsck_layout_update_pfid(const struct lu_env *env,
2215                                     struct lfsck_component *com,
2216                                     struct dt_object *parent,
2217                                     struct lu_fid *cfid,
2218                                     struct dt_device *cdev,
2219                                     struct lu_orphan_rec_v3 *rec, __u32 ea_off)
2220 {
2221         struct dt_object        *child;
2222         int                      rc     = 0;
2223         ENTRY;
2224
2225         child = lfsck_object_find_by_dev(env, cdev, cfid);
2226         if (IS_ERR(child))
2227                 RETURN(PTR_ERR(child));
2228
2229         rc = __lfsck_layout_update_pfid(env, child,
2230                                         lu_object_fid(&parent->do_lu),
2231                                         &rec->lor_layout, ea_off,
2232                                         rec->lor_layout_version,
2233                                         rec->lor_range);
2234         lfsck_object_put(env, child);
2235
2236         RETURN(rc == 0 ? 1 : rc);
2237 }
2238
2239 static int lfsck_lovea_size(struct ost_layout *ol, __u32 ea_off)
2240 {
2241         if (ol->ol_comp_id != 0)
2242                 return sizeof(struct lov_comp_md_v1) +
2243                        sizeof(struct lov_comp_md_entry_v1) +
2244                        lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
2245
2246         if (ol->ol_stripe_count != 0)
2247                 return lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
2248
2249         return lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2250 }
2251
2252 /**
2253  * This function will create the MDT-object with the given (partial) LOV EA.
2254  *
2255  * Under some data corruption cases, the MDT-object of the file may be lost,
2256  * but its OST-objects, or some of them are there. The layout LFSCK needs to
2257  * re-create the MDT-object with the orphan OST-object(s) information.
2258  *
2259  * On the other hand, the LFSCK may has created some OST-object for repairing
2260  * dangling LOV EA reference, but as the LFSCK processing, it may find that
2261  * the old OST-object is there and should replace the former new created OST
2262  * object. Unfortunately, some others have modified such newly created object.
2263  * To keep the data (both new and old), the LFSCK will create MDT-object with
2264  * new FID to reference the original OST-object.
2265  *
2266  * \param[in] env       pointer to the thread context
2267  * \param[in] com       pointer to the lfsck component
2268  * \param[in] ltd       pointer to target device descriptor
2269  * \param[in] rec       pointer to the record for the orphan OST-object
2270  * \param[in] cfid      pointer to FID for the orphan OST-object
2271  * \param[in] infix     additional information, such as the FID for original
2272  *                      MDT-object and the stripe offset in the LOV EA
2273  * \param[in] type      the type for describing why the orphan MDT-object is
2274  *                      created. The rules are as following:
2275  *
2276  *  type "C":           Multiple OST-objects claim the same MDT-object and the
2277  *                      same slot in the layout EA. Then the LFSCK will create
2278  *                      new MDT-object(s) to hold the conflict OST-object(s).
2279  *
2280  *  type "N":           The orphan OST-object does not know which one was the
2281  *                      real parent MDT-object, so the LFSCK uses new FID for
2282  *                      its parent MDT-object.
2283  *
2284  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
2285  *                      but does not know the position (the file name) in the
2286  *                      layout.
2287  *
2288  *  type "D":           The MDT-object is a directory, it may knows its parent
2289  *                      but because there is no valid linkEA, the LFSCK cannot
2290  *                      know where to put it back to the namespace.
2291  *  type "O":           The MDT-object has no linkEA, and there is no name
2292  *                      entry that references the MDT-object.
2293  *
2294  *  type "P":           The orphan object to be created was a parent directory
2295  *                      of some MDT-object which linkEA shows that the @orphan
2296  *                      object is missing.
2297  *
2298  * The orphan name will be like:
2299  * ${FID}-${infix}-${type}-${conflict_version}
2300  *
2301  * \param[in] ea_off    the stripe offset in the LOV EA
2302  *
2303  * \retval              positive on repaired something
2304  * \retval              0 if needs to repair nothing
2305  * \retval              negative error number on failure
2306  */
2307 static int lfsck_layout_recreate_parent(const struct lu_env *env,
2308                                         struct lfsck_component *com,
2309                                         struct lfsck_tgt_desc *ltd,
2310                                         struct lu_orphan_rec_v3 *rec,
2311                                         struct lu_fid *cfid,
2312                                         const char *infix,
2313                                         const char *type,
2314                                         __u32 ea_off)
2315 {
2316         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2317         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
2318         char                            *name   = info->lti_key;
2319         struct lu_attr                  *la     = &info->lti_la2;
2320         struct dt_object_format         *dof    = &info->lti_dof;
2321         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2322         struct lu_fid                   *pfid   = &rec->lor_rec.lor_fid;
2323         struct lu_fid                   *tfid   = &info->lti_fid3;
2324         struct dt_device                *dev    = lfsck->li_bottom;
2325         struct dt_object                *lpf    = lfsck->li_lpf_obj;
2326         struct dt_object                *pobj   = NULL;
2327         struct dt_object                *cobj   = NULL;
2328         struct thandle                  *th     = NULL;
2329         struct lu_buf                   *ea_buf = &info->lti_big_buf;
2330         struct lu_buf                    lov_buf;
2331         struct lfsck_lock_handle        *llh    = &info->lti_llh;
2332         struct linkea_data               ldata  = { NULL };
2333         struct lu_buf                    linkea_buf;
2334         const struct lu_name            *pname;
2335         int                              size   = 0;
2336         int                              idx    = 0;
2337         int                              rc     = 0;
2338         ENTRY;
2339
2340         if (unlikely(lpf == NULL))
2341                 GOTO(log, rc = -ENXIO);
2342
2343         /* We use two separated transactions to repair the inconsistency.
2344          *
2345          * 1) create the MDT-object locally.
2346          * 2) update the OST-object's PFID EA if necessary.
2347          *
2348          * If 1) succeed, but 2) failed, then the OST-object's PFID EA will be
2349          * updated when the layout LFSCK run next time.
2350          *
2351          * If 1) failed, but 2) succeed, then such MDT-object will be re-created
2352          * when the layout LFSCK run next time. */
2353
2354         if (fid_is_zero(pfid)) {
2355                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
2356                 if (rc != 0)
2357                         GOTO(log, rc);
2358
2359                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
2360                 if (IS_ERR(cobj))
2361                         GOTO(log, rc = PTR_ERR(cobj));
2362         }
2363
2364         pobj = lfsck_object_find_by_dev(env, dev, pfid);
2365         if (IS_ERR(pobj))
2366                 GOTO(log, rc = PTR_ERR(pobj));
2367
2368         LASSERT(infix != NULL);
2369         LASSERT(type != NULL);
2370
2371         memset(la, 0, sizeof(*la));
2372         la->la_uid = rec->lor_rec.lor_uid;
2373         la->la_gid = rec->lor_rec.lor_gid;
2374         la->la_mode = S_IFREG | S_IRUSR;
2375         la->la_valid = LA_MODE | LA_UID | LA_GID;
2376
2377         memset(dof, 0, sizeof(*dof));
2378         dof->dof_type = dt_mode_to_dft(S_IFREG);
2379         /* Because the dof->dof_reg.striped = 0, the LOD will not create
2380          * the stripe(s). The LFSCK will specify the LOV EA via
2381          * lfsck_layout_update_lovea(). */
2382
2383         size = lfsck_lovea_size(&rec->lor_layout, ea_off);
2384         if (ea_buf->lb_len < size) {
2385                 lu_buf_realloc(ea_buf, size);
2386                 if (ea_buf->lb_buf == NULL)
2387                         GOTO(log, rc = -ENOMEM);
2388         }
2389
2390 again:
2391         do {
2392                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
2393                          type, idx++);
2394                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2395                                (const struct dt_key *)name);
2396                 if (rc != 0 && rc != -ENOENT)
2397                         GOTO(log, rc);
2398         } while (rc == 0);
2399
2400         rc = lfsck_lock(env, lfsck, lfsck->li_lpf_obj, name, llh,
2401                         MDS_INODELOCK_UPDATE, LCK_PW);
2402         if (rc != 0)
2403                 GOTO(log, rc);
2404
2405         /* Re-check whether the name conflict with othrs after taken
2406          * the ldlm lock. */
2407         rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2408                        (const struct dt_key *)name);
2409         if (unlikely(rc == 0)) {
2410                 lfsck_unlock(llh);
2411                 goto again;
2412         }
2413
2414         if (rc != -ENOENT)
2415                 GOTO(unlock, rc);
2416
2417         pname = lfsck_name_get_const(env, name, strlen(name));
2418         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf,
2419                               pname, lfsck_dto2fid(lfsck->li_lpf_obj));
2420         if (rc != 0)
2421                 GOTO(unlock, rc);
2422
2423         /* The 1st transaction. */
2424         th = dt_trans_create(env, dev);
2425         if (IS_ERR(th))
2426                 GOTO(unlock, rc = PTR_ERR(th));
2427
2428         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2429         if (rc != 0)
2430                 GOTO(stop, rc);
2431
2432         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
2433         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
2434                                   LU_XATTR_CREATE, th);
2435         if (rc != 0)
2436                 GOTO(stop, rc);
2437
2438         dtrec->rec_fid = pfid;
2439         dtrec->rec_type = S_IFREG;
2440         rc = dt_declare_insert(env, lpf,
2441                                (const struct dt_rec *)dtrec,
2442                                (const struct dt_key *)name, th);
2443         if (rc != 0)
2444                 GOTO(stop, rc);
2445
2446         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2447                        ldata.ld_leh->leh_len);
2448         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
2449                                   XATTR_NAME_LINK, 0, th);
2450         if (rc != 0)
2451                 GOTO(stop, rc);
2452
2453         rc = dt_trans_start_local(env, dev, th);
2454         if (rc != 0)
2455                 GOTO(stop, rc);
2456
2457         dt_write_lock(env, pobj, 0);
2458         rc = dt_create(env, pobj, la, NULL, dof, th);
2459         if (rc == 0)
2460                 rc = lfsck_layout_update_lovea(env, lfsck, th, rec, pobj, cfid,
2461                         &lov_buf, LU_XATTR_CREATE, ltd->ltd_index, ea_off);
2462         dt_write_unlock(env, pobj);
2463         if (rc < 0)
2464                 GOTO(stop, rc);
2465
2466         rc = dt_insert(env, lpf, (const struct dt_rec *)dtrec,
2467                        (const struct dt_key *)name, th);
2468         if (rc != 0)
2469                 GOTO(stop, rc);
2470
2471         rc = dt_xattr_set(env, pobj, &linkea_buf, XATTR_NAME_LINK, 0, th);
2472         if (rc == 0 && cobj != NULL) {
2473                 dt_trans_stop(env, dev, th);
2474                 th = NULL;
2475
2476                 /* The 2nd transaction. */
2477                 rc = __lfsck_layout_update_pfid(env, cobj, pfid,
2478                                                 &rec->lor_layout, ea_off,
2479                                                 rec->lor_layout_version,
2480                                                 rec->lor_range);
2481         }
2482
2483         GOTO(stop, rc);
2484
2485 stop:
2486         if (th != NULL)
2487                 dt_trans_stop(env, dev, th);
2488
2489 unlock:
2490         lfsck_unlock(llh);
2491
2492 log:
2493         if (cobj != NULL && !IS_ERR(cobj))
2494                 lfsck_object_put(env, cobj);
2495         if (pobj != NULL && !IS_ERR(pobj))
2496                 lfsck_object_put(env, pobj);
2497
2498         if (rc < 0)
2499                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2500                        "recreate the lost MDT-object: parent "DFID
2501                        ", child "DFID", OST-index %u, stripe-index %u, "
2502                        "infix %s, type %s: rc = %d\n",
2503                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2504                        ltd->ltd_index, ea_off, infix, type, rc);
2505
2506         return rc >= 0 ? 1 : rc;
2507 }
2508
2509 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2510                                                    struct lfsck_component *com,
2511                                                    const struct lu_fid *fid,
2512                                                    __u32 index)
2513 {
2514         struct lfsck_thread_info *info  = lfsck_env_info(env);
2515         struct lfsck_request     *lr    = &info->lti_lr;
2516         struct lfsck_instance    *lfsck = com->lc_lfsck;
2517         struct lfsck_tgt_desc    *ltd;
2518         struct ptlrpc_request    *req;
2519         struct lfsck_request     *tmp;
2520         struct obd_export        *exp;
2521         int                       rc    = 0;
2522         ENTRY;
2523
2524         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2525         if (unlikely(ltd == NULL))
2526                 RETURN(-ENXIO);
2527
2528         exp = ltd->ltd_exp;
2529         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2530                 GOTO(put, rc = -EOPNOTSUPP);
2531
2532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2533         if (req == NULL)
2534                 GOTO(put, rc = -ENOMEM);
2535
2536         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2537         if (rc != 0) {
2538                 ptlrpc_request_free(req);
2539
2540                 GOTO(put, rc);
2541         }
2542
2543         memset(lr, 0, sizeof(*lr));
2544         lr->lr_event = LE_CONDITIONAL_DESTROY;
2545         lr->lr_active = LFSCK_TYPE_LAYOUT;
2546         lr->lr_fid = *fid;
2547
2548         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2549         *tmp = *lr;
2550         ptlrpc_request_set_replen(req);
2551
2552         rc = ptlrpc_queue_wait(req);
2553         ptlrpc_req_finished(req);
2554
2555         GOTO(put, rc);
2556
2557 put:
2558         lfsck_tgt_put(ltd);
2559
2560         return rc;
2561 }
2562
2563 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2564                                                   struct lfsck_component *com,
2565                                                   struct lfsck_request *lr)
2566 {
2567         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2568         struct lu_attr                  *la     = &info->lti_la;
2569         union ldlm_policy_data          *policy = &info->lti_policy;
2570         struct ldlm_res_id              *resid  = &info->lti_resid;
2571         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2572         struct dt_device                *dev    = lfsck->li_bottom;
2573         struct lu_fid                   *fid    = &lr->lr_fid;
2574         struct dt_object                *obj;
2575         struct thandle                  *th     = NULL;
2576         struct lustre_handle             lh     = { 0 };
2577         __u64                            flags  = 0;
2578         int                              rc     = 0;
2579         ENTRY;
2580
2581         obj = lfsck_object_find_by_dev(env, dev, fid);
2582         if (IS_ERR(obj))
2583                 RETURN(PTR_ERR(obj));
2584
2585         dt_read_lock(env, obj, 0);
2586         if (dt_object_exists(obj) == 0 ||
2587             lfsck_is_dead_obj(obj)) {
2588                 dt_read_unlock(env, obj);
2589
2590                 GOTO(put, rc = -ENOENT);
2591         }
2592
2593         /* Get obj's attr without lock firstly. */
2594         rc = dt_attr_get(env, obj, la);
2595         dt_read_unlock(env, obj);
2596         if (rc != 0)
2597                 GOTO(put, rc);
2598
2599         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2600                 GOTO(put, rc = -ETXTBSY);
2601
2602         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2603         LASSERT(lfsck->li_namespace != NULL);
2604
2605         memset(policy, 0, sizeof(*policy));
2606         policy->l_extent.end = OBD_OBJECT_EOF;
2607         ost_fid_build_resid(fid, resid);
2608         rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
2609                                     LDLM_EXTENT, policy, LCK_EX, &flags,
2610                                     ldlm_blocking_ast, ldlm_completion_ast,
2611                                     NULL, NULL, 0, LVB_T_NONE, NULL, &lh);
2612         if (rc != ELDLM_OK)
2613                 GOTO(put, rc = -EIO);
2614
2615         dt_write_lock(env, obj, 0);
2616         /* Get obj's attr within lock again. */
2617         rc = dt_attr_get(env, obj, la);
2618         if (rc != 0)
2619                 GOTO(unlock, rc);
2620
2621         if (la->la_ctime != 0)
2622                 GOTO(unlock, rc = -ETXTBSY);
2623
2624         th = dt_trans_create(env, dev);
2625         if (IS_ERR(th))
2626                 GOTO(unlock, rc = PTR_ERR(th));
2627
2628         rc = dt_declare_ref_del(env, obj, th);
2629         if (rc != 0)
2630                 GOTO(stop, rc);
2631
2632         rc = dt_declare_destroy(env, obj, th);
2633         if (rc != 0)
2634                 GOTO(stop, rc);
2635
2636         rc = dt_trans_start_local(env, dev, th);
2637         if (rc != 0)
2638                 GOTO(stop, rc);
2639
2640         rc = dt_ref_del(env, obj, th);
2641         if (rc != 0)
2642                 GOTO(stop, rc);
2643
2644         rc = dt_destroy(env, obj, th);
2645         if (rc == 0)
2646                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2647                        "OST-object "DFID" that was created for reparing "
2648                        "dangling referenced case. But the original missing "
2649                        "OST-object is found now.\n",
2650                        lfsck_lfsck2name(lfsck), PFID(fid));
2651
2652         GOTO(stop, rc);
2653
2654 stop:
2655         dt_trans_stop(env, dev, th);
2656
2657 unlock:
2658         dt_write_unlock(env, obj);
2659         ldlm_lock_decref(&lh, LCK_EX);
2660
2661 put:
2662         lfsck_object_put(env, obj);
2663
2664         return rc;
2665 }
2666
2667 /**
2668  * Some OST-object has occupied the specified layout EA slot.
2669  * Such OST-object may be generated by the LFSCK when repair
2670  * dangling referenced MDT-object, which can be indicated by
2671  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2672  * is true and such OST-object has not been modified yet, we
2673  * will replace it with the orphan OST-object; otherwise the
2674  * LFSCK will create new MDT-object to reference the orphan.
2675  *
2676  * \retval       +1: repaired
2677  * \retval        0: did nothing
2678  * \retval      -ve: on error
2679  */
2680 static int lfsck_layout_conflict_create(const struct lu_env *env,
2681                                         struct lfsck_component *com,
2682                                         struct lfsck_tgt_desc *ltd,
2683                                         struct lu_orphan_rec_v3 *rec,
2684                                         struct dt_object *parent,
2685                                         struct lu_fid *cfid,
2686                                         struct lu_buf *ea_buf,
2687                                         struct lov_mds_md_v1 *lmm,
2688                                         struct lov_ost_data_v1 *slot,
2689                                         __u32 ea_off, int lovea_size)
2690 {
2691         struct lfsck_thread_info *info          = lfsck_env_info(env);
2692         struct lu_fid            *cfid2         = &info->lti_fid2;
2693         struct ost_id            *oi            = &info->lti_oi;
2694         struct dt_device         *dev           = lfsck_obj2dev(parent);
2695         struct thandle           *th            = NULL;
2696         struct lustre_handle      lh            = { 0 };
2697         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2698         int                       rc            = 0;
2699         ENTRY;
2700
2701         while (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val)) {
2702                 if (unlikely(!thread_is_running(&com->lc_lfsck->li_thread)))
2703                         RETURN(0);
2704         }
2705
2706         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2707         rc = ostid_to_fid(cfid2, oi, ost_idx2);
2708         if (rc != 0)
2709                 GOTO(out, rc);
2710
2711         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2712                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2713                               LCK_EX);
2714         if (rc != 0)
2715                 GOTO(out, rc);
2716
2717         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2718
2719         /* If the conflict OST-obejct is not created for fixing dangling
2720          * referenced MDT-object in former LFSCK check/repair, or it has
2721          * been modified by others, then we cannot destroy it. Re-create
2722          * a new MDT-object for the orphan OST-object. */
2723         if (rc == -ETXTBSY) {
2724                 /* No need the layout lock on the original parent. */
2725                 lfsck_ibits_unlock(&lh, LCK_EX);
2726
2727                 fid_zero(&rec->lor_rec.lor_fid);
2728                 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2729                          "-"DFID"-%x", PFID(lu_object_fid(&parent->do_lu)),
2730                          ea_off);
2731                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2732                                                 info->lti_tmpbuf, "C", ea_off);
2733
2734                 RETURN(rc);
2735         }
2736
2737         if (rc != 0 && rc != -ENOENT)
2738                 GOTO(unlock, rc);
2739
2740         th = dt_trans_create(env, dev);
2741         if (IS_ERR(th))
2742                 GOTO(unlock, rc = PTR_ERR(th));
2743
2744         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2745                                   LU_XATTR_REPLACE, th);
2746         if (rc != 0)
2747                 GOTO(stop, rc);
2748
2749         rc = dt_trans_start_local(env, dev, th);
2750         if (rc != 0)
2751                 GOTO(stop, rc);
2752
2753         dt_write_lock(env, parent, 0);
2754         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2755         rc = lfsck_layout_refill_lovea(env, com->lc_lfsck, th, parent, cfid,
2756                                        ea_buf, lmm, slot, LU_XATTR_REPLACE,
2757                                        ltd->ltd_index, lovea_size);
2758         dt_write_unlock(env, parent);
2759
2760         GOTO(stop, rc);
2761
2762 stop:
2763         dt_trans_stop(env, dev, th);
2764
2765 unlock:
2766         lfsck_ibits_unlock(&lh, LCK_EX);
2767
2768 out:
2769         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2770                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2771                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2772                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2773                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2774                ea_off, rc);
2775
2776         return rc >= 0 ? 1 : rc;
2777 }
2778
2779 /**
2780  * \retval       +1: repaired
2781  * \retval        0: did nothing
2782  * \retval      -ve: on error
2783  */
2784 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2785                                        struct lfsck_component *com,
2786                                        struct lfsck_tgt_desc *ltd,
2787                                        struct lu_orphan_rec_v3 *rec,
2788                                        struct dt_object *parent,
2789                                        struct lu_fid *cfid,
2790                                        __u32 ost_idx, __u32 ea_off)
2791 {
2792         struct lfsck_thread_info *info          = lfsck_env_info(env);
2793         struct lu_buf            *buf           = &info->lti_big_buf;
2794         struct lu_fid            *fid           = &info->lti_fid2;
2795         struct ost_id            *oi            = &info->lti_oi;
2796         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2797         struct dt_device         *dt            = lfsck_obj2dev(parent);
2798         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2799         struct ost_layout        *ol            = &rec->lor_layout;
2800         struct lov_comp_md_v1    *lcm           = NULL;
2801         struct lov_comp_md_entry_v1 *lcme       = NULL;
2802         struct thandle           *handle        = NULL;
2803         size_t                    lovea_size;
2804         struct lov_mds_md_v1     *lmm;
2805         struct lov_ost_data_v1   *objs;
2806         struct lustre_handle      lh            = { 0 };
2807         __u32                     magic;
2808         __u32 flags = 0;
2809         int                       fl            = 0;
2810         int                       rc            = 0;
2811         int                       rc1;
2812         int                       i;
2813         int pos = 0;
2814         __u16 count;
2815         bool locked = false;
2816         bool new_mirror = true;
2817         ENTRY;
2818
2819         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2820                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2821                               LCK_EX);
2822         if (rc != 0) {
2823                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2824                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2825                        "stripe-index %u, comp_id %u, comp_start %llu, "
2826                        "comp_end %llu, layout version %u, range %u: rc = %d\n",
2827                        lfsck_lfsck2name(lfsck), PFID(cfid),
2828                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off,
2829                        ol->ol_comp_id, ol->ol_comp_start,
2830                        ol->ol_comp_end, rec->lor_layout_version,
2831                        rec->lor_range, rc);
2832
2833                 RETURN(rc);
2834         }
2835
2836 again:
2837         if (locked) {
2838                 dt_write_unlock(env, parent);
2839                 locked = false;
2840         }
2841
2842         if (handle != NULL) {
2843                 dt_trans_stop(env, dt, handle);
2844                 handle = NULL;
2845         }
2846
2847         if (rc < 0)
2848                 GOTO(unlock_layout, rc);
2849
2850         lovea_size = rc;
2851         if (buf->lb_len < lovea_size) {
2852                 lu_buf_realloc(buf, lovea_size);
2853                 if (buf->lb_buf == NULL)
2854                         GOTO(unlock_layout, rc = -ENOMEM);
2855         }
2856
2857         if (!(bk->lb_param & LPF_DRYRUN)) {
2858                 handle = dt_trans_create(env, dt);
2859                 if (IS_ERR(handle))
2860                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2861
2862                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2863                                           fl, handle);
2864                 if (rc != 0)
2865                         GOTO(stop, rc);
2866
2867                 rc = dt_trans_start_local(env, dt, handle);
2868                 if (rc != 0)
2869                         GOTO(stop, rc);
2870         }
2871
2872         dt_write_lock(env, parent, 0);
2873         locked = true;
2874         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV);
2875         if (rc == -ERANGE) {
2876                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV);
2877                 LASSERT(rc != 0);
2878                 goto again;
2879         } else if (rc == -ENODATA || rc == 0) {
2880                 lovea_size = lfsck_lovea_size(ol, ea_off);
2881                 /* If the declared is not big enough, re-try. */
2882                 if (buf->lb_len < lovea_size) {
2883                         rc = lovea_size;
2884                         goto again;
2885                 }
2886                 fl = LU_XATTR_CREATE;
2887         } else if (rc < 0) {
2888                 GOTO(unlock_parent, rc);
2889         } else if (unlikely(buf->lb_len == 0)) {
2890                 goto again;
2891         } else {
2892                 fl = LU_XATTR_REPLACE;
2893                 lovea_size = rc;
2894         }
2895
2896         if (fl == LU_XATTR_CREATE) {
2897                 if (bk->lb_param & LPF_DRYRUN)
2898                         GOTO(unlock_parent, rc = 1);
2899
2900                 LASSERT(buf->lb_len >= lovea_size);
2901
2902                 rc = lfsck_layout_update_lovea(env, lfsck, handle, rec, parent,
2903                                                cfid, buf, fl, ost_idx, ea_off);
2904
2905                 GOTO(unlock_parent, rc);
2906         }
2907
2908         lmm = buf->lb_buf;
2909         rc1 = lfsck_layout_verify_header(parent, lmm, lovea_size);
2910
2911         /* If the LOV EA crashed, the rebuild it. */
2912         if (rc1 == -EINVAL) {
2913                 if (bk->lb_param & LPF_DRYRUN)
2914                         GOTO(unlock_parent, rc = 1);
2915
2916                 LASSERT(buf->lb_len >= lovea_size);
2917
2918                 rc = lfsck_layout_update_lovea(env, lfsck, handle, rec, parent,
2919                                                cfid, buf, fl, ost_idx, ea_off);
2920
2921                 GOTO(unlock_parent, rc);
2922         }
2923
2924         /* For other unknown magic/pattern, keep the current LOV EA. */
2925         if (rc1 == -EOPNOTSUPP)
2926                 GOTO(unlock_parent, rc1 = 0);
2927
2928         if (rc1)
2929                 GOTO(unlock_parent, rc = rc1);
2930
2931         magic = le32_to_cpu(lmm->lmm_magic);
2932         if (magic == LOV_MAGIC_COMP_V1) {
2933                 __u64 start;
2934                 __u64 end;
2935                 __u16 mirror_id0 = mirror_id_of(ol->ol_comp_id);
2936                 __u16 mirror_id1;
2937
2938                 if (bk->lb_param & LPF_DRYRUN)
2939                         GOTO(unlock_parent, rc = 1);
2940
2941                 lcm = buf->lb_buf;
2942                 count = le16_to_cpu(lcm->lcm_entry_count);
2943                 for (i = 0; i < count; pos = ++i) {
2944                         lcme = &lcm->lcm_entries[i];
2945                         start = le64_to_cpu(lcme->lcme_extent.e_start);
2946                         end = le64_to_cpu(lcme->lcme_extent.e_end);
2947                         mirror_id1 = mirror_id_of(le32_to_cpu(lcme->lcme_id));
2948
2949                         if (mirror_id0 > mirror_id1)
2950                                 continue;
2951
2952                         if (mirror_id0 < mirror_id1)
2953                                 break;
2954
2955                         new_mirror = false;
2956                         if (end <= ol->ol_comp_start)
2957                                 continue;
2958
2959                         if (start >= ol->ol_comp_end)
2960                                 break;
2961
2962                         lmm = buf->lb_buf + le32_to_cpu(lcme->lcme_offset);
2963                         magic = le32_to_cpu(lmm->lmm_magic);
2964                         flags = le32_to_cpu(lcme->lcme_flags);
2965                         goto further;
2966                 }
2967
2968                 rc = lfsck_layout_add_comp(env, lfsck, handle, rec, parent,
2969                                 cfid, buf, ost_idx, ea_off, pos, new_mirror);
2970
2971                 GOTO(unlock_parent, rc);
2972         }
2973
2974 further:
2975         count = le16_to_cpu(lmm->lmm_stripe_count);
2976         if (count == 0)
2977                 GOTO(unlock_parent, rc = -EINVAL);
2978         LASSERT(count > 0);
2979
2980         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2981         if (count <= ea_off) {
2982                 if (bk->lb_param & LPF_DRYRUN)
2983                         GOTO(unlock_parent, rc = 1);
2984
2985                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2986                 /* If the declared is not big enough, re-try. */
2987                 if (buf->lb_len < lovea_size) {
2988                         rc = lovea_size;
2989                         goto again;
2990                 }
2991
2992                 if (lcm) {
2993                         LASSERT(lcme);
2994
2995                         lcme->lcme_flags = cpu_to_le32(flags | LCME_FL_INIT);
2996                         lfsck_layout_update_lcm(lcm, lcme,
2997                                                 rec->lor_layout_version,
2998                                                 rec->lor_range);
2999                 }
3000
3001                 rc = lfsck_layout_extend_v1v3_lovea(env, lfsck, handle, ol,
3002                                         parent, cfid, buf, ost_idx, ea_off);
3003
3004                 GOTO(unlock_parent, rc);
3005         }
3006
3007         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
3008
3009         if (magic == LOV_MAGIC_V1) {
3010                 objs = &lmm->lmm_objects[0];
3011         } else {
3012                 LASSERT(magic == LOV_MAGIC_V3);
3013                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3014         }
3015
3016         for (i = 0; i < count; i++, objs++) {
3017                 /* The MDT-object was created via lfsck_layout_recover_create()
3018                  * by others before, and we fill the dummy layout EA. */
3019                 if ((lcme && !(flags & LCME_FL_INIT)) ||
3020                      lovea_slot_is_dummy(objs)) {
3021                         if (i != ea_off)
3022                                 continue;
3023
3024                         if (bk->lb_param & LPF_DRYRUN)
3025                                 GOTO(unlock_parent, rc = 1);
3026
3027                         lmm->lmm_layout_gen =
3028                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
3029                         if (lcme) {
3030                                 LASSERT(lcm);
3031
3032                                 if (le32_to_cpu(lmm->lmm_stripe_size) !=
3033                                         ol->ol_stripe_size ||
3034                                     le16_to_cpu(lmm->lmm_stripe_count) !=
3035                                         ol->ol_stripe_count ||
3036                                     le64_to_cpu(lcme->lcme_extent.e_start) !=
3037                                         ol->ol_comp_start ||
3038                                     le64_to_cpu(lcme->lcme_extent.e_end) !=
3039                                         ol->ol_comp_end) {
3040                                         CDEBUG(D_LFSCK, "%s: found invalid "
3041                                         "component for "DFID ": parent "DFID
3042                                         ", stripe-index %u, stripe_size %u, "
3043                                         "stripe_count %u, comp_id %u, "
3044                                         "comp_start %llu, comp_end %llu, "
3045                                         "cur_stripe_size %u, "
3046                                         "cur_stripe_count %u, "
3047                                         "cur_comp_start %llu, "
3048                                         "cur_comp_end %llu\n",
3049                                         lfsck_lfsck2name(lfsck), PFID(cfid),
3050                                         PFID(lfsck_dto2fid(parent)), ea_off,
3051                                         ol->ol_stripe_size,
3052                                         ol->ol_stripe_count, ol->ol_comp_id,
3053                                         ol->ol_comp_start, ol->ol_comp_end,
3054                                         le32_to_cpu(lmm->lmm_stripe_size),
3055                                         le16_to_cpu(lmm->lmm_stripe_count),
3056                                         le64_to_cpu(lcme->lcme_extent.e_start),
3057                                         le64_to_cpu(lcme->lcme_extent.e_end));
3058
3059                                         GOTO(unlock_parent, rc = -EINVAL);
3060                                 }
3061
3062                                 lovea_size = le32_to_cpu(lcm->lcm_size);
3063                                 lcme->lcme_flags = cpu_to_le32(flags |
3064                                                                LCME_FL_INIT);
3065                                 lfsck_layout_update_lcm(lcm, lcme,
3066                                                         rec->lor_layout_version,
3067                                                         rec->lor_range);
3068                         }
3069
3070                         LASSERTF(buf->lb_len >= lovea_size,
3071                                  "buffer len %d is less than real size %d\n",
3072                                  (int)buf->lb_len, (int)lovea_size);
3073
3074                         rc = lfsck_layout_refill_lovea(env, lfsck, handle,
3075                                                 parent, cfid, buf, lmm, objs,
3076                                                 fl, ost_idx, lovea_size);
3077
3078                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
3079                                "dummy layout slot for "DFID": parent "DFID
3080                                ", OST-index %u, stripe-index %u: rc = %d\n",
3081                                lfsck_lfsck2name(lfsck), PFID(cfid),
3082                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
3083
3084                         GOTO(unlock_parent, rc);
3085                 }
3086
3087                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3088                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
3089                 if (rc != 0) {
3090                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
3091                                "invalid layout EA at the slot %d, index %u\n",
3092  &nb