Whamcloud - gitweb
LU-9019 lfsck: migrate to 64 bit time
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2016, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC_V1           0xB173AE14
52 #define LFSCK_LAYOUT_MAGIC_V2           0xB1734D76
53 #define LFSCK_LAYOUT_MAGIC_V3           0xB17371B9
54 #define LFSCK_LAYOUT_MAGIC_V4           0xB1732FED
55
56 #define LFSCK_LAYOUT_MAGIC              LFSCK_LAYOUT_MAGIC_V4
57
58 struct lfsck_layout_seq {
59         struct list_head         lls_list;
60         __u64                    lls_seq;
61         __u64                    lls_lastid;
62         __u64                    lls_lastid_known;
63         struct dt_object        *lls_lastid_obj;
64         unsigned int             lls_dirty:1;
65 };
66
67 struct lfsck_layout_slave_target {
68         /* link into lfsck_layout_slave_data::llsd_master_list. */
69         struct list_head        llst_list;
70         /* The position for next record in the rbtree for iteration. */
71         struct lu_fid           llst_fid;
72         /* Dummy hash for iteration against the rbtree. */
73         __u64                   llst_hash;
74         __u64                   llst_gen;
75         atomic_t                llst_ref;
76         __u32                   llst_index;
77         /* How many times we have failed to get the master status. */
78         int                     llst_failures;
79 };
80
81 struct lfsck_layout_slave_data {
82         /* list for lfsck_layout_seq */
83         struct list_head         llsd_seq_list;
84
85         /* list for the masters involve layout verification. */
86         struct list_head         llsd_master_list;
87         spinlock_t               llsd_lock;
88         __u64                    llsd_touch_gen;
89         struct dt_object        *llsd_rb_obj;
90         struct rb_root           llsd_rb_root;
91         rwlock_t                 llsd_rb_lock;
92         unsigned int             llsd_rbtree_valid:1;
93 };
94
95 struct lfsck_layout_slave_async_args {
96         struct obd_export                *llsaa_exp;
97         struct lfsck_component           *llsaa_com;
98         struct lfsck_layout_slave_target *llsaa_llst;
99 };
100
101 static inline bool lfsck_comp_extent_aligned(__u64 size)
102 {
103          return (size & (LOV_MIN_STRIPE_SIZE - 1)) == 0;
104 }
105
106 static inline void
107 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
108 {
109         if (atomic_dec_and_test(&llst->llst_ref)) {
110                 LASSERT(list_empty(&llst->llst_list));
111
112                 OBD_FREE_PTR(llst);
113         }
114 }
115
116 static inline int
117 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
118 {
119         struct lfsck_layout_slave_target *llst;
120         struct lfsck_layout_slave_target *tmp;
121         int                               rc   = 0;
122
123         OBD_ALLOC_PTR(llst);
124         if (llst == NULL)
125                 return -ENOMEM;
126
127         INIT_LIST_HEAD(&llst->llst_list);
128         llst->llst_gen = 0;
129         llst->llst_index = index;
130         atomic_set(&llst->llst_ref, 1);
131
132         spin_lock(&llsd->llsd_lock);
133         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
134                 if (tmp->llst_index == index) {
135                         rc = -EALREADY;
136                         break;
137                 }
138         }
139         if (rc == 0)
140                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
141         spin_unlock(&llsd->llsd_lock);
142
143         if (rc != 0)
144                 OBD_FREE_PTR(llst);
145
146         return rc;
147 }
148
149 static inline void
150 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
151                       struct lfsck_layout_slave_target *llst)
152 {
153         bool del = false;
154
155         spin_lock(&llsd->llsd_lock);
156         if (!list_empty(&llst->llst_list)) {
157                 list_del_init(&llst->llst_list);
158                 del = true;
159         }
160         spin_unlock(&llsd->llsd_lock);
161
162         if (del)
163                 lfsck_layout_llst_put(llst);
164 }
165
166 static inline struct lfsck_layout_slave_target *
167 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
168                                __u32 index, bool unlink)
169 {
170         struct lfsck_layout_slave_target *llst;
171
172         spin_lock(&llsd->llsd_lock);
173         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
174                 if (llst->llst_index == index) {
175                         if (unlink)
176                                 list_del_init(&llst->llst_list);
177                         else
178                                 atomic_inc(&llst->llst_ref);
179                         spin_unlock(&llsd->llsd_lock);
180
181                         return llst;
182                 }
183         }
184         spin_unlock(&llsd->llsd_lock);
185
186         return NULL;
187 }
188
189 static struct lfsck_layout_req *
190 lfsck_layout_assistant_req_init(struct lfsck_assistant_object *lso,
191                                 struct dt_object *child, __u32 comp_id,
192                                 __u32 ost_idx, __u32 lov_idx)
193 {
194         struct lfsck_layout_req *llr;
195
196         OBD_ALLOC_PTR(llr);
197         if (llr == NULL)
198                 return ERR_PTR(-ENOMEM);
199
200         INIT_LIST_HEAD(&llr->llr_lar.lar_list);
201         llr->llr_lar.lar_parent = lfsck_assistant_object_get(lso);
202         llr->llr_child = child;
203         llr->llr_comp_id = comp_id;
204         llr->llr_ost_idx = ost_idx;
205         llr->llr_lov_idx = lov_idx;
206
207         return llr;
208 }
209
210 static void lfsck_layout_assistant_req_fini(const struct lu_env *env,
211                                             struct lfsck_assistant_req *lar)
212 {
213         struct lfsck_layout_req *llr =
214                         container_of0(lar, struct lfsck_layout_req, llr_lar);
215
216         lfsck_object_put(env, llr->llr_child);
217         lfsck_assistant_object_put(env, lar->lar_parent);
218         OBD_FREE_PTR(llr);
219 }
220
221 static int
222 lfsck_layout_assistant_sync_failures_interpret(const struct lu_env *env,
223                                                struct ptlrpc_request *req,
224                                                void *args, int rc)
225 {
226         if (rc == 0) {
227                 struct lfsck_async_interpret_args *laia = args;
228                 struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
229
230                 ltd->ltd_synced_failures = 1;
231                 atomic_dec(laia->laia_count);
232         }
233
234         return 0;
235 }
236
237 /**
238  * Notify remote LFSCK instances about former failures.
239  *
240  * The local LFSCK instance has recorded which OSTs have ever failed to respond
241  * some LFSCK verification requests (maybe because of network issues or the OST
242  * itself trouble). During the respond gap, the OST may missed some OST-objects
243  * verification, then the OST cannot know whether related OST-objects have been
244  * referenced by related MDT-objects or not, then in the second-stage scanning,
245  * these OST-objects will be regarded as orphan, if the OST-object contains bad
246  * parent FID for back reference, then it will misguide the LFSCK to make wrong
247  * fixing for the fake orphan.
248  *
249  * To avoid above trouble, when layout LFSCK finishes the first-stage scanning,
250  * it will scan the bitmap for the ever failed OSTs, and notify them that they
251  * have ever missed some OST-object verification and should skip the handling
252  * for orphan OST-objects on all MDTs that are in the layout LFSCK.
253  *
254  * \param[in] env       pointer to the thread context
255  * \param[in] com       pointer to the lfsck component
256  * \param[in] lr        pointer to the lfsck request
257  */
258 static void lfsck_layout_assistant_sync_failures(const struct lu_env *env,
259                                                  struct lfsck_component *com,
260                                                  struct lfsck_request *lr)
261 {
262         struct lfsck_async_interpret_args *laia  =
263                                 &lfsck_env_info(env)->lti_laia2;
264         struct lfsck_assistant_data       *lad   = com->lc_data;
265         struct lfsck_layout               *lo    = com->lc_file_ram;
266         struct lfsck_instance             *lfsck = com->lc_lfsck;
267         struct lfsck_tgt_descs            *ltds  = &lfsck->li_ost_descs;
268         struct lfsck_tgt_desc             *ltd;
269         struct ptlrpc_request_set         *set;
270         atomic_t                           count;
271         __u32                              idx;
272         int                                rc    = 0;
273         ENTRY;
274
275         if (!lad->lad_incomplete)
276                 RETURN_EXIT;
277
278         /* If the MDT has ever failed to verfiy some OST-objects,
279          * then sync failures with them firstly. */
280         lr->lr_flags2 = lo->ll_flags | LF_INCOMPLETE;
281
282         atomic_set(&count, 0);
283         memset(laia, 0, sizeof(*laia));
284         laia->laia_count = &count;
285         set = ptlrpc_prep_set();
286         if (set == NULL)
287                 GOTO(out, rc = -ENOMEM);
288
289         down_read(&ltds->ltd_rw_sem);
290         cfs_foreach_bit(lad->lad_bitmap, idx) {
291                 ltd = lfsck_ltd2tgt(ltds, idx);
292                 if (unlikely(!ltd))
293                         continue;
294
295                 laia->laia_ltd = ltd;
296                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
297                                 lfsck_layout_assistant_sync_failures_interpret,
298                                 laia, LFSCK_NOTIFY);
299                 if (rc != 0) {
300                         CDEBUG(D_LFSCK, "%s: LFSCK assistant fail to "
301                                "notify target %x for %s phase1 done: "
302                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
303                                ltd->ltd_index, lad->lad_name, rc);
304
305                         break;
306                 }
307
308                 atomic_inc(&count);
309         }
310         up_read(&ltds->ltd_rw_sem);
311
312         if (rc == 0 && atomic_read(&count) > 0)
313                 rc = ptlrpc_set_wait(set);
314
315         ptlrpc_set_destroy(set);
316
317         if (rc == 0 && atomic_read(&count) > 0)
318                 rc = -EINVAL;
319
320         GOTO(out, rc);
321
322 out:
323         if (rc != 0)
324                 /* If failed to sync failures with the OSTs, then have to
325                  * mark the whole LFSCK as LF_INCOMPLETE to skip the whole
326                  * subsequent orphan OST-object handling. */
327                 lo->ll_flags |= LF_INCOMPLETE;
328
329         lr->lr_flags2 = lo->ll_flags;
330 }
331
332 static int lfsck_layout_verify_header_v1v3(struct dt_object *obj,
333                                            struct lov_mds_md_v1 *lmm)
334 {
335         __u32 magic;
336         __u32 pattern;
337
338         magic = le32_to_cpu(lmm->lmm_magic);
339         /* If magic crashed, keep it there. Sometime later, during OST-object
340          * orphan handling, if some OST-object(s) back-point to it, it can be
341          * verified and repaired. */
342         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
343                 int rc;
344
345                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
346                         rc = -EOPNOTSUPP;
347                 else
348                         rc = -EINVAL;
349
350                 CDEBUG(D_LFSCK, "%s LOV EA magic %u for the file "DFID"\n",
351                        rc == -EINVAL ? "Unknown" : "Unsupported",
352                        magic, PFID(lfsck_dto2fid(obj)));
353
354                 return rc;
355         }
356
357         pattern = le32_to_cpu(lmm->lmm_pattern);
358         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
359         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
360                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u for the file "
361                        DFID"\n", pattern, PFID(lfsck_dto2fid(obj)));
362
363                 return -EOPNOTSUPP;
364         }
365
366         return 0;
367 }
368
369 static int lfsck_layout_verify_header(struct dt_object *obj,
370                                       struct lov_mds_md_v1 *lmm)
371 {
372         int rc = 0;
373
374         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) {
375                 struct lov_comp_md_v1 *lcm = (struct lov_comp_md_v1 *)lmm;
376                 int i;
377                 __u16 count = le16_to_cpu(lcm->lcm_entry_count);
378
379                 if (unlikely(count == 0)) {
380                         CDEBUG(D_LFSCK, "the PFL file "DFID" contains invalid "
381                                "components count 0\n",
382                                PFID(lfsck_dto2fid(obj)));
383
384                         return -EINVAL;
385                 }
386
387                 for (i = 0; i < count; i++) {
388                         struct lov_comp_md_entry_v1 *lcme =
389                                                 &lcm->lcm_entries[i];
390                         __u64 start = le64_to_cpu(lcme->lcme_extent.e_start);
391                         __u64 end = le64_to_cpu(lcme->lcme_extent.e_end);
392                         __u32 comp_id = le32_to_cpu(lcme->lcme_id);
393
394                         if (unlikely(comp_id == LCME_ID_INVAL ||
395                                      comp_id > LCME_ID_MAX)) {
396                                 CDEBUG(D_LFSCK, "found invalid FPL ID %u "
397                                        "for the file "DFID" at idx %d\n",
398                                        comp_id, PFID(lfsck_dto2fid(obj)), i);
399
400                                 return -EINVAL;
401                         }
402
403                         if (unlikely(start >= end ||
404                                      !lfsck_comp_extent_aligned(start) ||
405                                      (!lfsck_comp_extent_aligned(end) &&
406                                       end != LUSTRE_EOF))) {
407                                 CDEBUG(D_LFSCK, "found invalid FPL extent "
408                                        "range [%llu - %llu) for the file "
409                                        DFID" at idx %d\n",
410                                        start, end, PFID(lfsck_dto2fid(obj)), i);
411
412                                 return -EINVAL;
413                         }
414
415                         rc = lfsck_layout_verify_header_v1v3(obj,
416                                 (struct lov_mds_md_v1 *)((char *)lmm +
417                                 le32_to_cpu(lcme->lcme_offset)));
418                         if (rc)
419                                 return rc;
420                 }
421         } else {
422                 rc = lfsck_layout_verify_header_v1v3(obj, lmm);
423         }
424
425         return rc;
426 }
427
428 static int lfsck_layout_get_lovea(const struct lu_env *env,
429                                   struct dt_object *obj, struct lu_buf *buf)
430 {
431         int rc;
432         int rc1;
433
434 again:
435         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV);
436         if (rc == -ERANGE) {
437                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV);
438                 if (rc <= 0)
439                         return rc;
440
441                 lu_buf_realloc(buf, rc);
442                 if (buf->lb_buf == NULL)
443                         return -ENOMEM;
444
445                 goto again;
446         }
447
448         if (rc == -ENODATA)
449                 rc = 0;
450
451         if (rc <= 0)
452                 return rc;
453
454         if (unlikely(buf->lb_buf == NULL)) {
455                 lu_buf_alloc(buf, rc);
456                 if (buf->lb_buf == NULL)
457                         return -ENOMEM;
458
459                 goto again;
460         }
461
462         rc1 = lfsck_layout_verify_header(obj, buf->lb_buf);
463
464         return rc1 ? rc1 : rc;
465 }
466
467 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_SIZE
468 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
469 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
470
471 struct lfsck_rbtree_node {
472         struct rb_node   lrn_node;
473         __u64            lrn_seq;
474         __u32            lrn_first_oid;
475         atomic_t         lrn_known_count;
476         atomic_t         lrn_accessed_count;
477         void            *lrn_known_bitmap;
478         void            *lrn_accessed_bitmap;
479 };
480
481 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
482                                    __u64 seq, __u32 oid)
483 {
484         if (seq < lrn->lrn_seq)
485                 return -1;
486
487         if (seq > lrn->lrn_seq)
488                 return 1;
489
490         if (oid < lrn->lrn_first_oid)
491                 return -1;
492
493         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
494                 return 1;
495
496         return 0;
497 }
498
499 /* The caller should hold llsd->llsd_rb_lock. */
500 static struct lfsck_rbtree_node *
501 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
502                     const struct lu_fid *fid, bool *exact)
503 {
504         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
505         struct rb_node           *prev  = NULL;
506         struct lfsck_rbtree_node *lrn   = NULL;
507         int                       rc    = 0;
508
509         if (exact != NULL)
510                 *exact = true;
511
512         while (node != NULL) {
513                 prev = node;
514                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
515                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
516                 if (rc < 0)
517                         node = node->rb_left;
518                 else if (rc > 0)
519                         node = node->rb_right;
520                 else
521                         return lrn;
522         }
523
524         if (exact == NULL)
525                 return NULL;
526
527         /* If there is no exactly matched one, then to the next valid one. */
528         *exact = false;
529
530         /* The rbtree is empty. */
531         if (rc == 0)
532                 return NULL;
533
534         if (rc < 0)
535                 return lrn;
536
537         node = rb_next(prev);
538
539         /* The end of the rbtree. */
540         if (node == NULL)
541                 return NULL;
542
543         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
544
545         return lrn;
546 }
547
548 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
549                                                   const struct lu_fid *fid)
550 {
551         struct lfsck_rbtree_node *lrn;
552
553         OBD_ALLOC_PTR(lrn);
554         if (lrn == NULL)
555                 return ERR_PTR(-ENOMEM);
556
557         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
558         if (lrn->lrn_known_bitmap == NULL) {
559                 OBD_FREE_PTR(lrn);
560
561                 return ERR_PTR(-ENOMEM);
562         }
563
564         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
565         if (lrn->lrn_accessed_bitmap == NULL) {
566                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
567                 OBD_FREE_PTR(lrn);
568
569                 return ERR_PTR(-ENOMEM);
570         }
571
572         RB_CLEAR_NODE(&lrn->lrn_node);
573         lrn->lrn_seq = fid_seq(fid);
574         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
575         atomic_set(&lrn->lrn_known_count, 0);
576         atomic_set(&lrn->lrn_accessed_count, 0);
577
578         return lrn;
579 }
580
581 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
582 {
583         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
584         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
585         OBD_FREE_PTR(lrn);
586 }
587
588 /* The caller should hold lock. */
589 static struct lfsck_rbtree_node *
590 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
591                     struct lfsck_rbtree_node *lrn)
592 {
593         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
594         struct rb_node            *parent = NULL;
595         struct lfsck_rbtree_node  *tmp;
596         int                        rc;
597
598         while (*pos != NULL) {
599                 parent = *pos;
600                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
601                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
602                 if (rc < 0)
603                         pos = &(*pos)->rb_left;
604                 else if (rc > 0)
605                         pos = &(*pos)->rb_right;
606                 else
607                         return tmp;
608         }
609
610         rb_link_node(&lrn->lrn_node, parent, pos);
611         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
612
613         return lrn;
614 }
615
616 extern const struct dt_index_operations lfsck_orphan_index_ops;
617
618 static int lfsck_rbtree_setup(const struct lu_env *env,
619                               struct lfsck_component *com)
620 {
621         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
622         struct lfsck_instance           *lfsck  = com->lc_lfsck;
623         struct dt_device                *dev    = lfsck->li_bottom;
624         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
625         struct dt_object                *obj;
626
627         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
628         fid->f_oid = lfsck_dev_idx(lfsck);
629         fid->f_ver = 0;
630         obj = dt_locate(env, dev, fid);
631         if (IS_ERR(obj))
632                 RETURN(PTR_ERR(obj));
633
634         /* Generate an in-RAM object to stand for the layout rbtree.
635          * Scanning the layout rbtree will be via the iteration over
636          * the object. In the future, the rbtree may be written onto
637          * disk with the object.
638          *
639          * Mark the object to be as exist. */
640         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
641         obj->do_index_ops = &lfsck_orphan_index_ops;
642         llsd->llsd_rb_obj = obj;
643         llsd->llsd_rbtree_valid = 1;
644         dev->dd_record_fid_accessed = 1;
645
646         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
647                lfsck_lfsck2name(lfsck));
648
649         return 0;
650 }
651
652 static void lfsck_rbtree_cleanup(const struct lu_env *env,
653                                  struct lfsck_component *com)
654 {
655         struct lfsck_instance           *lfsck = com->lc_lfsck;
656         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
657         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
658         struct rb_node                  *next;
659         struct lfsck_rbtree_node        *lrn;
660
661         lfsck->li_bottom->dd_record_fid_accessed = 0;
662         /* Invalid the rbtree, then no others will use it. */
663         write_lock(&llsd->llsd_rb_lock);
664         llsd->llsd_rbtree_valid = 0;
665         write_unlock(&llsd->llsd_rb_lock);
666
667         while (node != NULL) {
668                 next = rb_next(node);
669                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
670                 rb_erase(node, &llsd->llsd_rb_root);
671                 lfsck_rbtree_free(lrn);
672                 node = next;
673         }
674
675         if (llsd->llsd_rb_obj != NULL) {
676                 lfsck_object_put(env, llsd->llsd_rb_obj);
677                 llsd->llsd_rb_obj = NULL;
678         }
679
680         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
681                lfsck_lfsck2name(lfsck));
682 }
683
684 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
685                                        struct lfsck_component *com,
686                                        const struct lu_fid *fid,
687                                        bool accessed)
688 {
689         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
690         struct lfsck_rbtree_node        *lrn;
691         bool                             insert = false;
692         int                              idx;
693         int                              rc     = 0;
694         ENTRY;
695
696         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
697                 RETURN_EXIT;
698
699         if (!fid_is_idif(fid) && !fid_is_norm(fid))
700                 RETURN_EXIT;
701
702         read_lock(&llsd->llsd_rb_lock);
703         if (!llsd->llsd_rbtree_valid)
704                 GOTO(unlock, rc = 0);
705
706         lrn = lfsck_rbtree_search(llsd, fid, NULL);
707         if (lrn == NULL) {
708                 struct lfsck_rbtree_node *tmp;
709
710                 LASSERT(!insert);
711
712                 read_unlock(&llsd->llsd_rb_lock);
713                 tmp = lfsck_rbtree_new(env, fid);
714                 if (IS_ERR(tmp))
715                         GOTO(out, rc = PTR_ERR(tmp));
716
717                 insert = true;
718                 write_lock(&llsd->llsd_rb_lock);
719                 if (!llsd->llsd_rbtree_valid) {
720                         lfsck_rbtree_free(tmp);
721                         GOTO(unlock, rc = 0);
722                 }
723
724                 lrn = lfsck_rbtree_insert(llsd, tmp);
725                 if (lrn != tmp)
726                         lfsck_rbtree_free(tmp);
727         }
728
729         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
730         /* Any accessed object must be a known object. */
731         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
732                 atomic_inc(&lrn->lrn_known_count);
733         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
734                 atomic_inc(&lrn->lrn_accessed_count);
735
736         GOTO(unlock, rc = 0);
737
738 unlock:
739         if (insert)
740                 write_unlock(&llsd->llsd_rb_lock);
741         else
742                 read_unlock(&llsd->llsd_rb_lock);
743 out:
744         if (rc != 0 && accessed) {
745                 struct lfsck_layout *lo = com->lc_file_ram;
746
747                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
748                        "bitmap, and will cause incorrect LFSCK OST-object "
749                        "handling, so disable it to cancel orphan handling "
750                        "for related device. rc = %d\n",
751                        lfsck_lfsck2name(com->lc_lfsck), rc);
752
753                 lo->ll_flags |= LF_INCOMPLETE;
754                 lfsck_rbtree_cleanup(env, com);
755         }
756 }
757
758 static inline void lldk_le_to_cpu(struct lfsck_layout_dangling_key *des,
759                                   const struct lfsck_layout_dangling_key *src)
760 {
761         fid_le_to_cpu(&des->lldk_fid, &src->lldk_fid);
762         des->lldk_comp_id = le32_to_cpu(src->lldk_comp_id);
763         des->lldk_ea_off = le32_to_cpu(src->lldk_ea_off);
764 }
765
766 static inline void lldk_cpu_to_le(struct lfsck_layout_dangling_key *des,
767                                   const struct lfsck_layout_dangling_key *src)
768 {
769         fid_cpu_to_le(&des->lldk_fid, &src->lldk_fid);
770         des->lldk_comp_id = cpu_to_le32(src->lldk_comp_id);
771         des->lldk_ea_off = cpu_to_le32(src->lldk_ea_off);
772 }
773
774 static inline void lldk_be_to_cpu(struct lfsck_layout_dangling_key *des,
775                                   const struct lfsck_layout_dangling_key *src)
776 {
777         fid_be_to_cpu(&des->lldk_fid, &src->lldk_fid);
778         des->lldk_comp_id = be32_to_cpu(src->lldk_comp_id);
779         des->lldk_ea_off = be32_to_cpu(src->lldk_ea_off);
780 }
781
782 static inline void lldk_cpu_to_be(struct lfsck_layout_dangling_key *des,
783                                   const struct lfsck_layout_dangling_key *src)
784 {
785         fid_cpu_to_be(&des->lldk_fid, &src->lldk_fid);
786         des->lldk_comp_id = cpu_to_be32(src->lldk_comp_id);
787         des->lldk_ea_off = cpu_to_be32(src->lldk_ea_off);
788 }
789
790 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
791                                    const struct lfsck_layout *src)
792 {
793         int i;
794
795         des->ll_magic = le32_to_cpu(src->ll_magic);
796         des->ll_status = le32_to_cpu(src->ll_status);
797         des->ll_flags = le32_to_cpu(src->ll_flags);
798         des->ll_success_count = le32_to_cpu(src->ll_success_count);
799         des->ll_run_time_phase1 = le64_to_cpu(src->ll_run_time_phase1);
800         des->ll_run_time_phase2 = le64_to_cpu(src->ll_run_time_phase2);
801         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
802         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
803         des->ll_time_last_checkpoint =
804                                 le64_to_cpu(src->ll_time_last_checkpoint);
805         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
806         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
807         des->ll_pos_first_inconsistent =
808                         le64_to_cpu(src->ll_pos_first_inconsistent);
809         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
810         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
811         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
812         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
813         for (i = 0; i < LLIT_MAX; i++)
814                 des->ll_objs_repaired[i] =
815                                 le64_to_cpu(src->ll_objs_repaired[i]);
816         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
817         des->ll_bitmap_size = le32_to_cpu(src->ll_bitmap_size);
818         lldk_le_to_cpu(&des->ll_lldk_latest_scanned_phase2,
819                        &src->ll_lldk_latest_scanned_phase2);
820 }
821
822 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
823                                    const struct lfsck_layout *src)
824 {
825         int i;
826
827         des->ll_magic = cpu_to_le32(src->ll_magic);
828         des->ll_status = cpu_to_le32(src->ll_status);
829         des->ll_flags = cpu_to_le32(src->ll_flags);
830         des->ll_success_count = cpu_to_le32(src->ll_success_count);
831         des->ll_run_time_phase1 = cpu_to_le64(src->ll_run_time_phase1);
832         des->ll_run_time_phase2 = cpu_to_le64(src->ll_run_time_phase2);
833         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
834         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
835         des->ll_time_last_checkpoint =
836                                 cpu_to_le64(src->ll_time_last_checkpoint);
837         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
838         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
839         des->ll_pos_first_inconsistent =
840                         cpu_to_le64(src->ll_pos_first_inconsistent);
841         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
842         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
843         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
844         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
845         for (i = 0; i < LLIT_MAX; i++)
846                 des->ll_objs_repaired[i] =
847                                 cpu_to_le64(src->ll_objs_repaired[i]);
848         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
849         des->ll_bitmap_size = cpu_to_le32(src->ll_bitmap_size);
850         lldk_cpu_to_le(&des->ll_lldk_latest_scanned_phase2,
851                        &src->ll_lldk_latest_scanned_phase2);
852 }
853
854 /**
855  * Load the OST bitmap from the lfsck_layout trace file.
856  *
857  * \param[in] env       pointer to the thread context
858  * \param[in] com       pointer to the lfsck component
859  *
860  * \retval              0 for success
861  * \retval              negative error number on failure or data corruption
862  */
863 static int lfsck_layout_load_bitmap(const struct lu_env *env,
864                                     struct lfsck_component *com)
865 {
866         struct dt_object                *obj    = com->lc_obj;
867         struct lfsck_assistant_data     *lad    = com->lc_data;
868         struct lfsck_layout             *lo     = com->lc_file_ram;
869         struct cfs_bitmap                       *bitmap = lad->lad_bitmap;
870         loff_t                           pos    = com->lc_file_size;
871         ssize_t                          size;
872         __u32                            nbits;
873         int                              rc;
874         ENTRY;
875
876         if (com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size >
877             lo->ll_bitmap_size)
878                 nbits = com->lc_lfsck->li_ost_descs.ltd_tgts_bitmap->size;
879         else
880                 nbits = lo->ll_bitmap_size;
881
882         if (unlikely(nbits < BITS_PER_LONG))
883                 nbits = BITS_PER_LONG;
884
885         if (nbits > bitmap->size) {
886                 __u32 new_bits = bitmap->size;
887                 struct cfs_bitmap *new_bitmap;
888
889                 while (new_bits < nbits)
890                         new_bits <<= 1;
891
892                 new_bitmap = CFS_ALLOCATE_BITMAP(new_bits);
893                 if (new_bitmap == NULL)
894                         RETURN(-ENOMEM);
895
896                 lad->lad_bitmap = new_bitmap;
897                 CFS_FREE_BITMAP(bitmap);
898                 bitmap = new_bitmap;
899         }
900
901         if (lo->ll_bitmap_size == 0) {
902                 lad->lad_incomplete = 0;
903                 CFS_RESET_BITMAP(bitmap);
904
905                 RETURN(0);
906         }
907
908         size = (lo->ll_bitmap_size + 7) >> 3;
909         rc = dt_read(env, obj, lfsck_buf_get(env, bitmap->data, size), &pos);
910         if (rc != size)
911                 RETURN(rc >= 0 ? -EINVAL : rc);
912
913         if (cfs_bitmap_check_empty(bitmap))
914                 lad->lad_incomplete = 0;
915         else
916                 lad->lad_incomplete = 1;
917
918         RETURN(0);
919 }
920
921 /**
922  * Load the layout LFSCK trace file from disk.
923  *
924  * The layout LFSCK trace file records the layout LFSCK status information
925  * and other statistics, such as how many objects have been scanned, and how
926  * many objects have been repaired, and etc. It also contains the bitmap for
927  * failed OSTs during the layout LFSCK. All these information will be loaded
928  * from disk to RAM when the layout LFSCK component setup.
929  *
930  * \param[in] env       pointer to the thread context
931  * \param[in] com       pointer to the lfsck component
932  *
933  * \retval              positive number for file data corruption, the caller
934  *                      should reset the layout LFSCK trace file
935  * \retval              0 for success
936  * \retval              negative error number on failure
937  */
938 static int lfsck_layout_load(const struct lu_env *env,
939                              struct lfsck_component *com)
940 {
941         struct lfsck_layout             *lo     = com->lc_file_ram;
942         ssize_t                          size   = com->lc_file_size;
943         loff_t                           pos    = 0;
944         int                              rc;
945
946         rc = dt_read(env, com->lc_obj,
947                      lfsck_buf_get(env, com->lc_file_disk, size), &pos);
948         if (rc == 0) {
949                 return -ENOENT;
950         } else if (rc < 0) {
951                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
952                        lfsck_lfsck2name(com->lc_lfsck), rc);
953                 return rc;
954         } else if (rc != size) {
955                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
956                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
957                 return 1;
958         }
959
960         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
961         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
962                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
963                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
964                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
965                 return 1;
966         }
967
968         return 0;
969 }
970
971 /**
972  * Store the layout LFSCK trace file on disk.
973  *
974  * The layout LFSCK trace file records the layout LFSCK status information
975  * and other statistics, such as how many objects have been scanned, and how
976  * many objects have been repaired, and etc. It also contains the bitmap for
977  * failed OSTs during the layout LFSCK. All these information will be synced
978  * from RAM to disk periodically.
979  *
980  * \param[in] env       pointer to the thread context
981  * \param[in] com       pointer to the lfsck component
982  *
983  * \retval              0 for success
984  * \retval              negative error number on failure
985  */
986 static int lfsck_layout_store(const struct lu_env *env,
987                               struct lfsck_component *com)
988 {
989         struct dt_object        *obj    = com->lc_obj;
990         struct lfsck_instance   *lfsck  = com->lc_lfsck;
991         struct lfsck_layout     *lo_ram = com->lc_file_ram;
992         struct lfsck_layout     *lo     = com->lc_file_disk;
993         struct thandle          *th;
994         struct dt_device        *dev    = lfsck_obj2dev(obj);
995         struct cfs_bitmap       *bitmap = NULL;
996         loff_t                   pos;
997         ssize_t                  size   = com->lc_file_size;
998         __u32                    nbits  = 0;
999         int                      rc;
1000         ENTRY;
1001
1002         if (lfsck->li_master) {
1003                 struct lfsck_assistant_data *lad = com->lc_data;
1004
1005                 bitmap = lad->lad_bitmap;
1006                 nbits = bitmap->size;
1007
1008                 LASSERT(nbits > 0);
1009                 LASSERTF((nbits & 7) == 0, "Invalid nbits %u\n", nbits);
1010         }
1011
1012         lo_ram->ll_bitmap_size = nbits;
1013         lfsck_layout_cpu_to_le(lo, lo_ram);
1014         th = dt_trans_create(env, dev);
1015         if (IS_ERR(th))
1016                 GOTO(log, rc = PTR_ERR(th));
1017
1018         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
1019                                      (loff_t)0, th);
1020         if (rc != 0)
1021                 GOTO(out, rc);
1022
1023         if (bitmap != NULL) {
1024                 rc = dt_declare_record_write(env, obj,
1025                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
1026                                 (loff_t)size, th);
1027                 if (rc != 0)
1028                         GOTO(out, rc);
1029         }
1030
1031         rc = dt_trans_start_local(env, dev, th);
1032         if (rc != 0)
1033                 GOTO(out, rc);
1034
1035         pos = 0;
1036         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos, th);
1037         if (rc != 0)
1038                 GOTO(out, rc);
1039
1040         if (bitmap != NULL) {
1041                 pos = size;
1042                 rc = dt_record_write(env, obj,
1043                                 lfsck_buf_get(env, bitmap->data, nbits >> 3),
1044                                 &pos, th);
1045         }
1046
1047         GOTO(out, rc);
1048
1049 out:
1050         dt_trans_stop(env, dev, th);
1051
1052 log:
1053         if (rc != 0)
1054                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
1055                        lfsck_lfsck2name(lfsck), rc);
1056
1057         return rc;
1058 }
1059
1060 static int lfsck_layout_init(const struct lu_env *env,
1061                              struct lfsck_component *com)
1062 {
1063         struct lfsck_layout *lo = com->lc_file_ram;
1064         int rc;
1065
1066         memset(lo, 0, com->lc_file_size);
1067         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
1068         lo->ll_status = LS_INIT;
1069         down_write(&com->lc_sem);
1070         rc = lfsck_layout_store(env, com);
1071         if (rc == 0 && com->lc_lfsck->li_master)
1072                 rc = lfsck_load_sub_trace_files(env, com,
1073                         &dt_lfsck_layout_dangling_features, LFSCK_LAYOUT, true);
1074         up_write(&com->lc_sem);
1075
1076         return rc;
1077 }
1078
1079 static int fid_is_for_ostobj(const struct lu_env *env,
1080                              struct lfsck_instance *lfsck,
1081                              struct dt_object *obj, const struct lu_fid *fid)
1082 {
1083         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1084         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
1085         struct lustre_ost_attrs *loa;
1086         int                      rc;
1087
1088         fld_range_set_any(range);
1089         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
1090         if (rc == 0) {
1091                 if (fld_range_is_ost(range))
1092                         return 1;
1093
1094                 return 0;
1095         }
1096
1097         loa = &lfsck_env_info(env)->lti_loa;
1098         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, loa, sizeof(*loa)),
1099                           XATTR_NAME_LMA);
1100         if (rc >= sizeof(struct lustre_mdt_attrs)) {
1101                 lustre_lma_swab(&loa->loa_lma);
1102
1103                 return loa->loa_lma.lma_compat & LMAC_FID_ON_OST ? 1 : 0;
1104         }
1105
1106         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID);
1107
1108         return rc > 0;
1109 }
1110
1111 static struct lfsck_layout_seq *
1112 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
1113 {
1114         struct lfsck_layout_seq *lls;
1115
1116         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1117                 if (lls->lls_seq == seq)
1118                         return lls;
1119
1120                 if (lls->lls_seq > seq)
1121                         return NULL;
1122         }
1123
1124         return NULL;
1125 }
1126
1127 static void
1128 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
1129                         struct lfsck_layout_seq *lls)
1130 {
1131         struct lfsck_layout_seq *tmp;
1132         struct list_head        *pos = &llsd->llsd_seq_list;
1133
1134         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
1135                 if (lls->lls_seq < tmp->lls_seq) {
1136                         pos = &tmp->lls_list;
1137                         break;
1138                 }
1139         }
1140         list_add_tail(&lls->lls_list, pos);
1141 }
1142
1143 static int
1144 lfsck_layout_lastid_create(const struct lu_env *env,
1145                            struct lfsck_instance *lfsck,
1146                            struct dt_object *obj)
1147 {
1148         struct lfsck_thread_info *info   = lfsck_env_info(env);
1149         struct lu_attr           *la     = &info->lti_la;
1150         struct dt_object_format  *dof    = &info->lti_dof;
1151         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1152         struct dt_device         *dt     = lfsck_obj2dev(obj);
1153         struct thandle           *th;
1154         __u64                     lastid = 0;
1155         loff_t                    pos    = 0;
1156         int                       rc;
1157         ENTRY;
1158
1159         if (bk->lb_param & LPF_DRYRUN)
1160                 return 0;
1161
1162         memset(la, 0, sizeof(*la));
1163         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
1164         la->la_valid = LA_MODE | LA_UID | LA_GID;
1165         memset(dof, 0, sizeof(*dof));
1166         dof->dof_type = dt_mode_to_dft(S_IFREG);
1167
1168         th = dt_trans_create(env, dt);
1169         if (IS_ERR(th))
1170                 GOTO(log, rc = PTR_ERR(th));
1171
1172         rc = dt_declare_create(env, obj, la, NULL, dof, th);
1173         if (rc != 0)
1174                 GOTO(stop, rc);
1175
1176         rc = dt_declare_record_write(env, obj,
1177                                      lfsck_buf_get(env, &lastid,
1178                                                    sizeof(lastid)),
1179                                      pos, th);
1180         if (rc != 0)
1181                 GOTO(stop, rc);
1182
1183         rc = dt_trans_start_local(env, dt, th);
1184         if (rc != 0)
1185                 GOTO(stop, rc);
1186
1187         dt_write_lock(env, obj, 0);
1188         if (likely(dt_object_exists(obj) == 0)) {
1189                 rc = dt_create(env, obj, la, NULL, dof, th);
1190                 if (rc == 0)
1191                         rc = dt_record_write(env, obj,
1192                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
1193                                 &pos, th);
1194         }
1195         dt_write_unlock(env, obj);
1196
1197         GOTO(stop, rc);
1198
1199 stop:
1200         dt_trans_stop(env, dt, th);
1201
1202 log:
1203         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
1204                "%#llx: rc = %d\n",
1205                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
1206
1207         return rc;
1208 }
1209
1210 static int
1211 lfsck_layout_lastid_reload(const struct lu_env *env,
1212                            struct lfsck_component *com,
1213                            struct lfsck_layout_seq *lls)
1214 {
1215         __u64   lastid;
1216         loff_t  pos     = 0;
1217         int     rc;
1218
1219         dt_read_lock(env, lls->lls_lastid_obj, 0);
1220         rc = dt_record_read(env, lls->lls_lastid_obj,
1221                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
1222         dt_read_unlock(env, lls->lls_lastid_obj);
1223         if (unlikely(rc != 0))
1224                 return rc;
1225
1226         lastid = le64_to_cpu(lastid);
1227         if (lastid < lls->lls_lastid_known) {
1228                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
1229                 struct lfsck_layout     *lo     = com->lc_file_ram;
1230
1231                 lls->lls_lastid = lls->lls_lastid_known;
1232                 lls->lls_dirty = 1;
1233                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1234                         LASSERT(lfsck->li_out_notify != NULL);
1235
1236                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1237                                              LE_LASTID_REBUILDING);
1238                         lo->ll_flags |= LF_CRASHED_LASTID;
1239
1240                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
1241                                "LAST_ID file (1) for the sequence %#llx"
1242                                ", old value %llu, known value %llu\n",
1243                                lfsck_lfsck2name(lfsck), lls->lls_seq,
1244                                lastid, lls->lls_lastid);
1245                 }
1246         } else if (lastid >= lls->lls_lastid) {
1247                 lls->lls_lastid = lastid;
1248                 lls->lls_dirty = 0;
1249         }
1250
1251         return 0;
1252 }
1253
1254 static int
1255 lfsck_layout_lastid_store(const struct lu_env *env,
1256                           struct lfsck_component *com)
1257 {
1258         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1259         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1260         struct dt_device                *dt     = lfsck->li_bottom;
1261         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1262         struct lfsck_layout_seq         *lls;
1263         struct thandle                  *th;
1264         __u64                            lastid;
1265         int                              rc     = 0;
1266         int                              rc1    = 0;
1267
1268         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1269                 loff_t pos = 0;
1270
1271                 if (!lls->lls_dirty)
1272                         continue;
1273
1274                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1275                        "<seq> %#llx as <oid> %llu\n",
1276                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1277
1278                 if (bk->lb_param & LPF_DRYRUN) {
1279                         lls->lls_dirty = 0;
1280                         continue;
1281                 }
1282
1283                 th = dt_trans_create(env, dt);
1284                 if (IS_ERR(th)) {
1285                         rc1 = PTR_ERR(th);
1286                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1287                                "the LAST_ID for <seq> %#llx(1): rc = %d\n",
1288                                lfsck_lfsck2name(com->lc_lfsck),
1289                                lls->lls_seq, rc1);
1290                         continue;
1291                 }
1292
1293                 lastid = cpu_to_le64(lls->lls_lastid);
1294                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1295                                              lfsck_buf_get(env, &lastid,
1296                                                            sizeof(lastid)),
1297                                              pos, th);
1298                 if (rc != 0)
1299                         goto stop;
1300
1301                 rc = dt_trans_start_local(env, dt, th);
1302                 if (rc != 0)
1303                         goto stop;
1304
1305                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1306                 rc = dt_record_write(env, lls->lls_lastid_obj,
1307                                      lfsck_buf_get(env, &lastid,
1308                                      sizeof(lastid)), &pos, th);
1309                 dt_write_unlock(env, lls->lls_lastid_obj);
1310                 if (rc == 0)
1311                         lls->lls_dirty = 0;
1312
1313 stop:
1314                 dt_trans_stop(env, dt, th);
1315                 if (rc != 0) {
1316                         rc1 = rc;
1317                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1318                                "the LAST_ID for <seq> %#llx(2): rc = %d\n",
1319                                lfsck_lfsck2name(com->lc_lfsck),
1320                                lls->lls_seq, rc1);
1321                 }
1322         }
1323
1324         return rc1;
1325 }
1326
1327 static int
1328 lfsck_layout_lastid_load(const struct lu_env *env,
1329                          struct lfsck_component *com,
1330                          struct lfsck_layout_seq *lls)
1331 {
1332         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1333         struct lfsck_layout     *lo     = com->lc_file_ram;
1334         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1335         struct dt_object        *obj;
1336         loff_t                   pos    = 0;
1337         int                      rc;
1338         ENTRY;
1339
1340         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck));
1341         obj = dt_locate(env, lfsck->li_bottom, fid);
1342         if (IS_ERR(obj))
1343                 RETURN(PTR_ERR(obj));
1344
1345         /* LAST_ID crashed, to be rebuilt */
1346         if (dt_object_exists(obj) == 0) {
1347                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1348                         LASSERT(lfsck->li_out_notify != NULL);
1349
1350                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1351                                              LE_LASTID_REBUILDING);
1352                         lo->ll_flags |= LF_CRASHED_LASTID;
1353
1354                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1355                                "LAST_ID file for sequence %#llx\n",
1356                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1357
1358                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1359                             cfs_fail_val > 0) {
1360                                 struct l_wait_info lwi = LWI_TIMEOUT(
1361                                                 cfs_time_seconds(cfs_fail_val),
1362                                                 NULL, NULL);
1363
1364                                 /* Some others may changed the cfs_fail_val
1365                                  * as zero after above check, re-check it for
1366                                  * sure to avoid falling into wait for ever. */
1367                                 if (likely(lwi.lwi_timeout > 0)) {
1368                                         struct ptlrpc_thread *thread =
1369                                                 &lfsck->li_thread;
1370
1371                                         up_write(&com->lc_sem);
1372                                         l_wait_event(thread->t_ctl_waitq,
1373                                                      !thread_is_running(thread),
1374                                                      &lwi);
1375                                         down_write(&com->lc_sem);
1376                                 }
1377                         }
1378                 }
1379
1380                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1381         } else {
1382                 dt_read_lock(env, obj, 0);
1383                 rc = dt_read(env, obj,
1384                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1385                         &pos);
1386                 dt_read_unlock(env, obj);
1387                 if (rc != 0 && rc != sizeof(__u64))
1388                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1389
1390                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1391                         LASSERT(lfsck->li_out_notify != NULL);
1392
1393                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1394                                              LE_LASTID_REBUILDING);
1395                         lo->ll_flags |= LF_CRASHED_LASTID;
1396
1397                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1398                                "LAST_ID file for the sequence %#llx"
1399                                ": rc = %d\n",
1400                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1401                 }
1402
1403                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1404                 rc = 0;
1405         }
1406
1407         GOTO(out, rc);
1408
1409 out:
1410         if (rc != 0)
1411                 lfsck_object_put(env, obj);
1412         else
1413                 lls->lls_lastid_obj = obj;
1414
1415         return rc;
1416 }
1417
1418 static void lfsck_layout_record_failure(const struct lu_env *env,
1419                                         struct lfsck_instance *lfsck,
1420                                         struct lfsck_layout *lo)
1421 {
1422         __u64 cookie;
1423
1424         lo->ll_objs_failed_phase1++;
1425         cookie = lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1426                                                         lfsck->li_di_oit);
1427         if (lo->ll_pos_first_inconsistent == 0 ||
1428             lo->ll_pos_first_inconsistent < cookie) {
1429                 lo->ll_pos_first_inconsistent = cookie;
1430
1431                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1432                        "inconsistency at the pos [%llu]\n",
1433                        lfsck_lfsck2name(lfsck),
1434                        lo->ll_pos_first_inconsistent);
1435         }
1436 }
1437
1438 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1439                                            struct lfsck_component *com,
1440                                            int rc)
1441 {
1442         struct lfsck_instance   *lfsck = com->lc_lfsck;
1443         struct lfsck_layout     *lo    = com->lc_file_ram;
1444
1445         down_write(&com->lc_sem);
1446         lo->ll_run_time_phase2 += ktime_get_seconds() -
1447                                   com->lc_time_last_checkpoint;
1448         lo->ll_time_last_checkpoint = ktime_get_real_seconds();
1449         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1450
1451         if (rc > 0) {
1452                 if (lo->ll_flags & LF_INCOMPLETE) {
1453                         lo->ll_status = LS_PARTIAL;
1454                 } else {
1455                         if (lfsck->li_master) {
1456                                 struct lfsck_assistant_data *lad = com->lc_data;
1457
1458                                 if (lad->lad_incomplete)
1459                                         lo->ll_status = LS_PARTIAL;
1460                                 else
1461                                         lo->ll_status = LS_COMPLETED;
1462                         } else {
1463                                 lo->ll_status = LS_COMPLETED;
1464                         }
1465                 }
1466                 lo->ll_flags &= ~LF_SCANNED_ONCE;
1467                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
1468                         lo->ll_flags &= ~LF_INCONSISTENT;
1469                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1470                 lo->ll_success_count++;
1471         } else if (rc == 0) {
1472                 if (lfsck->li_status != 0)
1473                         lo->ll_status = lfsck->li_status;
1474                 else
1475                         lo->ll_status = LS_STOPPED;
1476         } else {
1477                 lo->ll_status = LS_FAILED;
1478         }
1479
1480         rc = lfsck_layout_store(env, com);
1481         up_write(&com->lc_sem);
1482
1483         return rc;
1484 }
1485
1486 static int lfsck_layout_trans_stop(const struct lu_env *env,
1487                                    struct dt_device *dev,
1488                                    struct thandle *handle, int result)
1489 {
1490         int rc;
1491
1492         /* XXX: If there is something worng or it needs to repair nothing,
1493          *      then notify the lower to stop the modification. Currently,
1494          *      we use th_result for such purpose, that may be replaced by
1495          *      some rollback mechanism in the future. */
1496         handle->th_result = result;
1497         rc = dt_trans_stop(env, dev, handle);
1498         if (result != 0)
1499                 return result > 0 ? 0 : result;
1500
1501         return rc == 0 ? 1 : rc;
1502 }
1503
1504 static int lfsck_layout_ins_dangling_rec(const struct lu_env *env,
1505                                          struct lfsck_component *com,
1506                                          const struct lu_fid *pfid,
1507                                          const struct lu_fid *cfid,
1508                                          __u32 comp_id, __u32 ea_off,
1509                                          __u32 ost_idx)
1510 {
1511         struct lfsck_layout_dangling_key *key = &lfsck_env_info(env)->lti_lldk;
1512         struct lu_fid *rec = &lfsck_env_info(env)->lti_fid3;
1513         struct dt_device *dev;
1514         struct dt_object *obj;
1515         struct thandle *th = NULL;
1516         int idx;
1517         int rc = 0;
1518         ENTRY;
1519
1520         idx = lfsck_sub_trace_file_fid2idx(pfid);
1521         obj = com->lc_sub_trace_objs[idx].lsto_obj;
1522         dev = lfsck_obj2dev(obj);
1523
1524         fid_cpu_to_be(&key->lldk_fid, pfid);
1525         key->lldk_comp_id = cpu_to_be32(comp_id);
1526         key->lldk_ea_off = cpu_to_be32(ea_off);
1527
1528         fid_cpu_to_be(rec, cfid);
1529         rec->f_ver = cpu_to_be32(ost_idx);
1530
1531         mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1532
1533         th = dt_trans_create(env, dev);
1534         if (IS_ERR(th))
1535                 GOTO(unlock, rc = PTR_ERR(th));
1536
1537         rc = dt_declare_insert(env, obj,
1538                                (const struct dt_rec *)rec,
1539                                (const struct dt_key *)key, th);
1540         if (rc)
1541                 GOTO(unlock, rc);
1542
1543         rc = dt_trans_start_local(env, dev, th);
1544         if (rc)
1545                 GOTO(unlock, rc);
1546
1547         rc = dt_insert(env, obj, (const struct dt_rec *)rec,
1548                        (const struct dt_key *)key, th, 1);
1549
1550         GOTO(unlock, rc);
1551
1552 unlock:
1553         if (th && !IS_ERR(th))
1554                 dt_trans_stop(env, dev, th);
1555
1556         mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1557
1558         CDEBUG(D_LFSCK, "%s: insert the paris "DFID" => "DFID", comp_id = %u, "
1559                "ea_off = %u, ost_idx = %u, into the trace file for further "
1560                "dangling check: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1561                PFID(pfid), PFID(cfid), comp_id, ea_off, ost_idx, rc);
1562
1563         return rc;
1564 }
1565
1566 static int lfsck_layout_del_dangling_rec(const struct lu_env *env,
1567                                          struct lfsck_component *com,
1568                                          const struct lu_fid *fid,
1569                                          __u32 comp_id, __u32 ea_off)
1570 {
1571         struct lfsck_layout_dangling_key *key = &lfsck_env_info(env)->lti_lldk;
1572         struct dt_device *dev;
1573         struct dt_object *obj;
1574         struct thandle *th = NULL;
1575         int idx;
1576         int rc = 0;
1577         ENTRY;
1578
1579         idx = lfsck_sub_trace_file_fid2idx(fid);
1580         obj = com->lc_sub_trace_objs[idx].lsto_obj;
1581         dev = lfsck_obj2dev(obj);
1582
1583         fid_cpu_to_be(&key->lldk_fid, fid);
1584         key->lldk_comp_id = cpu_to_be32(comp_id);
1585         key->lldk_ea_off = cpu_to_be32(ea_off);
1586
1587         mutex_lock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1588
1589         th = dt_trans_create(env, dev);
1590         if (IS_ERR(th))
1591                 GOTO(unlock, rc = PTR_ERR(th));
1592
1593         rc = dt_declare_delete(env, obj, (const struct dt_key *)key, th);
1594         if (rc)
1595                 GOTO(unlock, rc);
1596
1597         rc = dt_trans_start_local(env, dev, th);
1598         if (rc)
1599                 GOTO(unlock, rc);
1600
1601         rc = dt_delete(env, obj, (const struct dt_key *)key, th);
1602
1603         GOTO(unlock, rc);
1604
1605 unlock:
1606         if (th && !IS_ERR(th))
1607                 dt_trans_stop(env, dev, th);
1608
1609         mutex_unlock(&com->lc_sub_trace_objs[idx].lsto_mutex);
1610
1611         CDEBUG(D_LFSCK, "%s: delete the dangling record for "DFID
1612                ", comp_id = %u, ea_off = %u from the trace file: rc = %d\n",
1613                lfsck_lfsck2name(com->lc_lfsck), PFID(fid), comp_id, ea_off, rc);
1614
1615         return rc;
1616 }
1617
1618 /**
1619  * Get the system default stripe size.
1620  *
1621  * \param[in] env       pointer to the thread context
1622  * \param[in] lfsck     pointer to the lfsck instance
1623  * \param[out] size     pointer to the default stripe size
1624  *
1625  * \retval              0 for success
1626  * \retval              negative error number on failure
1627  */
1628 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1629                                            struct lfsck_instance *lfsck,
1630                                            __u32 *size)
1631 {
1632         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1633         struct dt_object        *root;
1634         int                      rc;
1635
1636         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1637         if (IS_ERR(root))
1638                 return PTR_ERR(root);
1639
1640         /* Get the default stripe size via xattr_get on the backend root. */
1641         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1642                           XATTR_NAME_LOV);
1643         if (rc > 0) {
1644                 /* The lum->lmm_stripe_size is LE mode. The *size also
1645                  * should be LE mode. So it is unnecessary to convert. */
1646                 *size = lum->lmm_stripe_size;
1647                 rc = 0;
1648         } else if (unlikely(rc == 0)) {
1649                 rc = -EINVAL;
1650         }
1651
1652         lfsck_object_put(env, root);
1653
1654         return rc;
1655 }
1656
1657 /**
1658  * \retval       +1: repaired
1659  * \retval        0: did nothing
1660  * \retval      -ve: on error
1661  */
1662 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1663                                      struct lfsck_instance *lfsck,
1664                                      struct thandle *handle,
1665                                      struct dt_object *parent,
1666                                      const struct lu_fid *cfid,
1667                                      struct lu_buf *buf,
1668                                      struct lov_mds_md_v1 *lmm,
1669                                      struct lov_ost_data_v1 *slot,
1670                                      int fl, __u32 ost_idx, int size)
1671 {
1672         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1673         struct lu_buf            ea_buf;
1674         int                      rc;
1675         __u32                    magic;
1676         __u32                    pattern;
1677         __u16                    count;
1678         ENTRY;
1679
1680         magic = le32_to_cpu(lmm->lmm_magic);
1681         pattern = le32_to_cpu(lmm->lmm_pattern);
1682         count = le16_to_cpu(lmm->lmm_stripe_count);
1683
1684         fid_to_ostid(cfid, oi);
1685         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1686         slot->l_ost_gen = cpu_to_le32(0);
1687         slot->l_ost_idx = cpu_to_le32(ost_idx);
1688
1689         if (pattern & LOV_PATTERN_F_HOLE) {
1690                 struct lov_ost_data_v1 *objs;
1691                 int                     i;
1692
1693                 if (magic == LOV_MAGIC_V1)
1694                         objs = &lmm->lmm_objects[0];
1695                 else
1696                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1697                 for (i = 0; i < count; i++, objs++) {
1698                         if (lovea_slot_is_dummy(objs))
1699                                 break;
1700                 }
1701
1702                 /* If the @slot is the last dummy slot to be refilled,
1703                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1704                 if (i == count) {
1705                         lmm->lmm_pattern =
1706                                 cpu_to_le32(pattern & ~LOV_PATTERN_F_HOLE);
1707
1708                         CDEBUG(D_LFSCK, "%s: remove layout HOLE for "DFID
1709                                ": parent "DFID"\n", lfsck_lfsck2name(lfsck),
1710                                PFID(cfid), PFID(lfsck_dto2fid(parent)));
1711                 }
1712         }
1713
1714         lfsck_buf_init(&ea_buf, buf->lb_buf, size);
1715         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle);
1716         if (rc == 0)
1717                 rc = 1;
1718
1719         RETURN(rc);
1720 }
1721
1722 static struct lov_ost_data_v1 *
1723 __lfsck_layout_new_v1_lovea(struct lov_mds_md_v1 *lmm,
1724                             const struct lu_fid *pfid,
1725                             __u32 stripe_size, __u32 ea_off,
1726                             __u32 pattern, __u16 count)
1727 {
1728         lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1729         lmm->lmm_pattern = cpu_to_le32(pattern);
1730         fid_to_lmm_oi(pfid, &lmm->lmm_oi);
1731         lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1732         lmm->lmm_stripe_size = cpu_to_le32(stripe_size);
1733         lmm->lmm_stripe_count = cpu_to_le16(count);
1734         lmm->lmm_layout_gen = cpu_to_le16(1);
1735         memset(&lmm->lmm_objects[0], 0,
1736                sizeof(struct lov_ost_data_v1) * count);
1737
1738         return &lmm->lmm_objects[ea_off];
1739 }
1740
1741 static int lfsck_layout_new_v1_lovea(const struct lu_env *env,
1742                                      struct lfsck_instance *lfsck,
1743                                      struct ost_layout *ol,
1744                                      struct dt_object *parent,
1745                                      struct lu_buf *buf, __u32 ea_off,
1746                                      struct lov_mds_md_v1 **lmm,
1747                                      struct lov_ost_data_v1 **objs)
1748 {
1749         int size;
1750         __u32 stripe_size = ol->ol_stripe_size;
1751         __u32 pattern = LOV_PATTERN_RAID0;
1752         __u16 count;
1753
1754         if (ol->ol_stripe_count != 0)
1755                 count = ol->ol_stripe_count;
1756         else
1757                 count = ea_off + 1;
1758
1759         size = lov_mds_md_size(count, LOV_MAGIC_V1);
1760         LASSERTF(buf->lb_len >= size,
1761                  "buffer len %d is less than real size %d\n",
1762                  (int)buf->lb_len, size);
1763
1764         if (stripe_size == 0) {
1765                 int rc;
1766
1767                 rc = lfsck_layout_get_def_stripesize(env, lfsck, &stripe_size);
1768                 if (rc)
1769                         return rc;
1770         }
1771
1772         *lmm = buf->lb_buf;
1773         if (ol->ol_stripe_count > 1 ||
1774             (ol->ol_stripe_count == 0 && ea_off != 0)) {
1775                 pattern |= LOV_PATTERN_F_HOLE;
1776                 memset(&(*lmm)->lmm_objects[0], 0,
1777                        count * sizeof(struct lov_ost_data_v1));
1778         }
1779
1780         *objs = __lfsck_layout_new_v1_lovea(*lmm, lfsck_dto2fid(parent),
1781                                 stripe_size, ea_off, pattern, count);
1782
1783         return size;
1784 }
1785
1786 static int lfsck_layout_new_comp_lovea(const struct lu_env *env,
1787                                       struct ost_layout *ol,
1788                                       struct dt_object *parent,
1789                                       struct lu_buf *buf, __u32 ea_off,
1790                                       struct lov_mds_md_v1 **lmm,
1791                                       struct lov_ost_data_v1 **objs)
1792 {
1793         struct lov_comp_md_v1 *lcm;
1794         struct lov_comp_md_entry_v1 *lcme;
1795         __u32 pattern = LOV_PATTERN_RAID0;
1796         __u32 offset = sizeof(*lcm) + sizeof(*lcme);
1797         int lcme_size = lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
1798         int size = offset + lcme_size;
1799
1800         LASSERTF(buf->lb_len >= size,
1801                  "buffer len %d is less than real size %d\n",
1802                  (int)buf->lb_len, size);
1803
1804         lcm = buf->lb_buf;
1805         lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
1806         lcm->lcm_size = cpu_to_le32(size);
1807         lcm->lcm_layout_gen = cpu_to_le32(1);
1808         lcm->lcm_flags = 0;
1809         lcm->lcm_entry_count = cpu_to_le16(1);
1810
1811         lcme = &lcm->lcm_entries[0];
1812         lcme->lcme_id = cpu_to_le32(ol->ol_comp_id);
1813         lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT);
1814         lcme->lcme_extent.e_start = cpu_to_le64(ol->ol_comp_start);
1815         lcme->lcme_extent.e_end = cpu_to_le64(ol->ol_comp_end);
1816         lcme->lcme_offset = cpu_to_le32(offset);
1817         lcme->lcme_size = cpu_to_le32(lcme_size);
1818         if (ol->ol_stripe_count > 1)
1819                 pattern |= LOV_PATTERN_F_HOLE;
1820
1821         *lmm = buf->lb_buf + offset;
1822         *objs = __lfsck_layout_new_v1_lovea(*lmm, lfsck_dto2fid(parent),
1823                                             ol->ol_stripe_size, ea_off,
1824                                             pattern, ol->ol_stripe_count);
1825
1826         return size;
1827 }
1828
1829 static int lfsck_layout_add_comp_comp(const struct lu_env *env,
1830                                      struct lfsck_instance *lfsck,
1831                                      struct thandle *handle,
1832                                      struct ost_layout *ol,
1833                                      struct dt_object *parent,
1834                                      const struct lu_fid *cfid,
1835                                      struct lu_buf *buf, __u32 ost_idx,
1836                                      __u32 ea_off, int pos)
1837 {
1838         struct lov_comp_md_v1 *lcm = buf->lb_buf;
1839         struct lov_comp_md_entry_v1 *lcme;
1840         struct lov_mds_md_v1 *lmm;
1841         struct lov_ost_data_v1 *objs;
1842         int added = sizeof(*lcme) +
1843                     lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
1844         int size = le32_to_cpu(lcm->lcm_size) + added;
1845         int rc;
1846         int i;
1847         __u32 offset;
1848         __u32 pattern = LOV_PATTERN_RAID0;
1849         __u16 count = le16_to_cpu(lcm->lcm_entry_count);
1850         ENTRY;
1851
1852         lu_buf_check_and_grow(buf, size);
1853         /* set the lcm again because lu_buf_check_and_grow() may
1854          * have reallocated the buf. */
1855         lcm = buf->lb_buf;
1856         lcm->lcm_size = cpu_to_le32(size);
1857         le32_add_cpu(&lcm->lcm_layout_gen, 1);
1858         lcm->lcm_entry_count = cpu_to_le16(count + 1);
1859
1860         /* 1. Move the component bodies from [pos, count-1] to [pos+1, count]
1861          *    with distance of 'added'. */
1862         if (pos < count) {
1863                 size = 0;
1864                 for (i = pos; i < count; i++) {
1865                         lcme = &lcm->lcm_entries[i];
1866                         size += le32_to_cpu(lcme->lcme_size);
1867                 }
1868
1869                 offset = le32_to_cpu(lcm->lcm_entries[pos].lcme_offset);
1870                 memmove(buf->lb_buf + offset + added,
1871                         buf->lb_buf + offset, size);
1872         }
1873
1874         size = 0;
1875         /* 2. Move the component header [0, pos-1] to [0, pos-1] with distance
1876          *    of 'sizeof(struct lov_comp_md_entry_v1)' */
1877         if (pos > 0) {
1878                 for (i = 0; i < pos; i++) {
1879                         lcme = &lcm->lcm_entries[i];
1880                         size += le32_to_cpu(lcme->lcme_size);
1881                 }
1882
1883                 offset = le32_to_cpu(lcm->lcm_entries[0].lcme_offset);
1884                 memmove(buf->lb_buf + offset + sizeof(*lcme),
1885                         buf->lb_buf + offset, size);
1886         }
1887
1888         /* 3. Recalculate the enter offset for the component [pos, count-1] */
1889         for (i = count - 1; i >= pos; i--) {
1890                 lcm->lcm_entries[i + 1] = lcm->lcm_entries[i];
1891                 lcm->lcm_entries[i + 1].lcme_offset =
1892                         cpu_to_le32(le32_to_cpu(lcm->lcm_entries[i + 1].
1893                                                 lcme_offset) + added);
1894         }
1895
1896         /* 4. Recalculate the enter offset for the component [0, pos) */
1897         for (i = 0; i < pos; i++) {
1898                 lcm->lcm_entries[i].lcme_offset =
1899                         cpu_to_le32(le32_to_cpu(lcm->lcm_entries[i].
1900                                                 lcme_offset) + sizeof(*lcme));
1901         }
1902
1903         offset = sizeof(*lcm) + sizeof(*lcme) * (count + 1) + size;
1904         /* 4. Insert the new component header (entry) at the slot 'pos'. */
1905         lcme = &lcm->lcm_entries[pos];
1906         lcme->lcme_id = cpu_to_le32(ol->ol_comp_id);
1907         lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT);
1908         lcme->lcme_extent.e_start = cpu_to_le64(ol->ol_comp_start);
1909         lcme->lcme_extent.e_end = cpu_to_le64(ol->ol_comp_end);
1910         lcme->lcme_offset = cpu_to_le32(offset);
1911         lcme->lcme_size = cpu_to_le32(lov_mds_md_size(ol->ol_stripe_count,
1912                                                       LOV_MAGIC_V1));
1913
1914         if (ol->ol_stripe_count > 1)
1915                 pattern |= LOV_PATTERN_F_HOLE;
1916
1917         lmm = buf->lb_buf + offset;
1918         /* 5. Insert teh new component body at the 'offset'. */
1919         objs = __lfsck_layout_new_v1_lovea(lmm, lfsck_dto2fid(parent),
1920                                            ol->ol_stripe_size, ea_off,
1921                                            pattern, ol->ol_stripe_count);
1922
1923         rc = lfsck_layout_refill_lovea(env, lfsck, handle, parent, cfid, buf,
1924                                        lmm, objs, LU_XATTR_REPLACE, ost_idx,
1925                                        le32_to_cpu(lcm->lcm_size));
1926
1927         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant add new COMP for "
1928                DFID": parent "DFID", OST-index %u, stripe-index %u, "
1929                "stripe_size %u, stripe_count %u, comp_id %u, comp_start %llu, "
1930                "comp_end %llu, %s LOV EA hole: rc = %d\n",
1931                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1932                ost_idx, ea_off, ol->ol_stripe_size, ol->ol_stripe_count,
1933                ol->ol_comp_id, ol->ol_comp_start, ol->ol_comp_end,
1934                le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE ?
1935                "with" : "without", rc);
1936
1937         RETURN(rc);
1938 }
1939
1940 static int lfsck_layout_extend_v1v3_lovea(const struct lu_env *env,
1941                                           struct lfsck_instance *lfsck,
1942                                           struct thandle *handle,
1943                                           struct ost_layout *ol,
1944                                           struct dt_object *parent,
1945                                           const struct lu_fid *cfid,
1946                                           struct lu_buf *buf, __u32 ost_idx,
1947                                           __u32 ea_off)
1948 {
1949         struct lov_mds_md_v1 *lmm = buf->lb_buf;
1950         struct lov_ost_data_v1 *objs;
1951         __u16 count = le16_to_cpu(lmm->lmm_stripe_count);
1952         __u32 magic = le32_to_cpu(lmm->lmm_magic);
1953         int size;
1954         int gap;
1955         int rc;
1956         ENTRY;
1957
1958         /* The original LOVEA maybe re-generated via old filter_fid, at
1959          * that time, we do not know the stripe count and stripe size. */
1960         if (ol->ol_stripe_count > count)
1961                 count = ol->ol_stripe_count;
1962         if (ol->ol_stripe_size != 0 &&
1963             ol->ol_stripe_size != le32_to_cpu(lmm->lmm_stripe_size))
1964                 lmm->lmm_stripe_size = cpu_to_le32(ol->ol_stripe_size);
1965
1966         if (magic == LOV_MAGIC_V1)
1967                 objs = &lmm->lmm_objects[count];
1968         else
1969                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[count];
1970
1971         gap = ea_off - count;
1972         if (gap >= 0)
1973                 count = ea_off + 1;
1974
1975         size = lov_mds_md_size(count, magic);
1976         LASSERTF(buf->lb_len >= size,
1977                  "buffer len %d is less than real size %d\n",
1978                  (int)buf->lb_len, size);
1979
1980         if (gap > 0) {
1981                 memset(objs, 0, gap * sizeof(*objs));
1982                 lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1983         }
1984
1985         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1986         lmm->lmm_stripe_count = cpu_to_le16(count);
1987         objs += gap;
1988
1989         rc = lfsck_layout_refill_lovea(env, lfsck, handle, parent, cfid, buf,
1990                                 lmm, objs, LU_XATTR_REPLACE, ost_idx, size);
1991
1992         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1993                DFID": parent "DFID", OST-index %u, stripe-index %u, "
1994                "stripe_size %u, stripe_count %u, comp_id %u, comp_start %llu, "
1995                "comp_end %llu, %s LOV EA hole: rc = %d\n",
1996                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1997                ost_idx, ea_off, ol->ol_stripe_size, ol->ol_stripe_count,
1998                ol->ol_comp_id, ol->ol_comp_start, ol->ol_comp_end,
1999                le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE ?
2000                "with" : "without", rc);
2001
2002         RETURN(rc);
2003 }
2004
2005 /**
2006  * \retval       +1: repaired
2007  * \retval        0: did nothing
2008  * \retval      -ve: on error
2009  */
2010 static int lfsck_layout_update_lovea(const struct lu_env *env,
2011                                      struct lfsck_instance *lfsck,
2012                                      struct thandle *handle,
2013                                      struct ost_layout *ol,
2014                                      struct dt_object *parent,
2015                                      const struct lu_fid *cfid,
2016                                      struct lu_buf *buf, int fl,
2017                                      __u32 ost_idx, __u32 ea_off)
2018 {
2019         struct lov_mds_md_v1 *lmm = NULL;
2020         struct lov_ost_data_v1 *objs = NULL;
2021         int rc = 0;
2022         ENTRY;
2023
2024         if (ol->ol_comp_id != 0)
2025                 rc = lfsck_layout_new_comp_lovea(env, ol, parent, buf, ea_off,
2026                                                 &lmm, &objs);
2027         else
2028                 rc = lfsck_layout_new_v1_lovea(env, lfsck, ol, parent, buf,
2029                                                ea_off, &lmm, &objs);
2030
2031         if (rc > 0)
2032                 rc = lfsck_layout_refill_lovea(env, lfsck, handle, parent, cfid,
2033                                                buf, lmm, objs, fl, ost_idx, rc);
2034
2035         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant created layout EA for "
2036                DFID": parent "DFID", OST-index %u, stripe-index %u, "
2037                "stripe_size %u, stripe_count %u, comp_id %u, comp_start %llu, "
2038                "comp_end %llu, fl %d, %s LOV EA hole: rc = %d\n",
2039                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
2040                ost_idx, ea_off, ol->ol_stripe_size, ol->ol_stripe_count,
2041                ol->ol_comp_id, ol->ol_comp_start, ol->ol_comp_end, fl,
2042                le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE ?
2043                "with" : "without", rc);
2044
2045         RETURN(rc);
2046 }
2047
2048 static int __lfsck_layout_update_pfid(const struct lu_env *env,
2049                                       struct dt_object *child,
2050                                       const struct lu_fid *pfid,
2051                                       const struct ost_layout *ol, __u32 offset)
2052 {
2053         struct dt_device        *dev    = lfsck_obj2dev(child);
2054         struct filter_fid       *ff     = &lfsck_env_info(env)->lti_ff;
2055         struct thandle          *handle;
2056         struct lu_buf            buf    = { NULL };
2057         int                      rc;
2058
2059         ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
2060         ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
2061         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2062          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2063          * parent MDT-object's layout EA. */
2064         ff->ff_parent.f_stripe_idx = cpu_to_le32(offset);
2065         ost_layout_cpu_to_le(&ff->ff_layout, ol);
2066         lfsck_buf_init(&buf, ff, sizeof(*ff));
2067
2068         handle = dt_trans_create(env, dev);
2069         if (IS_ERR(handle))
2070                 RETURN(PTR_ERR(handle));
2071
2072         rc = dt_declare_xattr_set(env, child, &buf, XATTR_NAME_FID, 0, handle);
2073         if (rc != 0)
2074                 GOTO(stop, rc);
2075
2076         rc = dt_trans_start_local(env, dev, handle);
2077         if (rc != 0)
2078                 GOTO(stop, rc);
2079
2080         rc = dt_xattr_set(env, child, &buf, XATTR_NAME_FID, 0, handle);
2081
2082         GOTO(stop, rc);
2083
2084 stop:
2085         dt_trans_stop(env, dev, handle);
2086
2087         return rc;
2088 }
2089
2090 /**
2091  * \retval       +1: repaired
2092  * \retval        0: did nothing
2093  * \retval      -ve: on error
2094  */
2095 static int lfsck_layout_update_pfid(const struct lu_env *env,
2096                                     struct lfsck_component *com,
2097                                     struct dt_object *parent,
2098                                     struct lu_fid *cfid,
2099                                     struct dt_device *cdev,
2100                                     struct ost_layout *ol, __u32 ea_off)
2101 {
2102         struct dt_object        *child;
2103         int                      rc     = 0;
2104         ENTRY;
2105
2106         child = lfsck_object_find_by_dev(env, cdev, cfid);
2107         if (IS_ERR(child))
2108                 RETURN(PTR_ERR(child));
2109
2110         rc = __lfsck_layout_update_pfid(env, child,
2111                                         lu_object_fid(&parent->do_lu),
2112                                         ol, ea_off);
2113         lfsck_object_put(env, child);
2114
2115         RETURN(rc == 0 ? 1 : rc);
2116 }
2117
2118 static int lfsck_lovea_size(struct ost_layout *ol, __u32 ea_off)
2119 {
2120         if (ol->ol_comp_id != 0)
2121                 return sizeof(struct lov_comp_md_v1) +
2122                        sizeof(struct lov_comp_md_entry_v1) +
2123                        lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
2124
2125         if (ol->ol_stripe_count != 0)
2126                 return lov_mds_md_size(ol->ol_stripe_count, LOV_MAGIC_V1);
2127
2128         return lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2129 }
2130
2131 /**
2132  * This function will create the MDT-object with the given (partial) LOV EA.
2133  *
2134  * Under some data corruption cases, the MDT-object of the file may be lost,
2135  * but its OST-objects, or some of them are there. The layout LFSCK needs to
2136  * re-create the MDT-object with the orphan OST-object(s) information.
2137  *
2138  * On the other hand, the LFSCK may has created some OST-object for repairing
2139  * dangling LOV EA reference, but as the LFSCK processing, it may find that
2140  * the old OST-object is there and should replace the former new created OST
2141  * object. Unfortunately, some others have modified such newly created object.
2142  * To keep the data (both new and old), the LFSCK will create MDT-object with
2143  * new FID to reference the original OST-object.
2144  *
2145  * \param[in] env       pointer to the thread context
2146  * \param[in] com       pointer to the lfsck component
2147  * \param[in] ltd       pointer to target device descriptor
2148  * \param[in] rec       pointer to the record for the orphan OST-object
2149  * \param[in] cfid      pointer to FID for the orphan OST-object
2150  * \param[in] infix     additional information, such as the FID for original
2151  *                      MDT-object and the stripe offset in the LOV EA
2152  * \param[in] type      the type for describing why the orphan MDT-object is
2153  *                      created. The rules are as following:
2154  *
2155  *  type "C":           Multiple OST-objects claim the same MDT-object and the
2156  *                      same slot in the layout EA. Then the LFSCK will create
2157  *                      new MDT-object(s) to hold the conflict OST-object(s).
2158  *
2159  *  type "N":           The orphan OST-object does not know which one was the
2160  *                      real parent MDT-object, so the LFSCK uses new FID for
2161  *                      its parent MDT-object.
2162  *
2163  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
2164  *                      but does not know the position (the file name) in the
2165  *                      layout.
2166  *
2167  *  type "D":           The MDT-object is a directory, it may knows its parent
2168  *                      but because there is no valid linkEA, the LFSCK cannot
2169  *                      know where to put it back to the namespace.
2170  *  type "O":           The MDT-object has no linkEA, and there is no name
2171  *                      entry that references the MDT-object.
2172  *
2173  *  type "P":           The orphan object to be created was a parent directory
2174  *                      of some MDT-object which linkEA shows that the @orphan
2175  *                      object is missing.
2176  *
2177  * The orphan name will be like:
2178  * ${FID}-${infix}-${type}-${conflict_version}
2179  *
2180  * \param[in] ea_off    the stripe offset in the LOV EA
2181  *
2182  * \retval              positive on repaired something
2183  * \retval              0 if needs to repair nothing
2184  * \retval              negative error number on failure
2185  */
2186 static int lfsck_layout_recreate_parent(const struct lu_env *env,
2187                                         struct lfsck_component *com,
2188                                         struct lfsck_tgt_desc *ltd,
2189                                         struct lu_orphan_rec_v2 *rec,
2190                                         struct lu_fid *cfid,
2191                                         const char *infix,
2192                                         const char *type,
2193                                         __u32 ea_off)
2194 {
2195         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2196         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
2197         char                            *name   = info->lti_key;
2198         struct lu_attr                  *la     = &info->lti_la2;
2199         struct dt_object_format         *dof    = &info->lti_dof;
2200         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2201         struct ost_layout               *ol     = &rec->lor_layout;
2202         struct lu_fid                   *pfid   = &rec->lor_rec.lor_fid;
2203         struct lu_fid                   *tfid   = &info->lti_fid3;
2204         struct dt_device                *dev    = lfsck->li_bottom;
2205         struct dt_object                *lpf    = lfsck->li_lpf_obj;
2206         struct dt_object                *pobj   = NULL;
2207         struct dt_object                *cobj   = NULL;
2208         struct thandle                  *th     = NULL;
2209         struct lu_buf                   *ea_buf = &info->lti_big_buf;
2210         struct lu_buf                    lov_buf;
2211         struct lfsck_lock_handle        *llh    = &info->lti_llh;
2212         struct linkea_data               ldata  = { NULL };
2213         struct lu_buf                    linkea_buf;
2214         const struct lu_name            *pname;
2215         int                              size   = 0;
2216         int                              idx    = 0;
2217         int                              rc     = 0;
2218         ENTRY;
2219
2220         if (unlikely(lpf == NULL))
2221                 GOTO(log, rc = -ENXIO);
2222
2223         /* We use two separated transactions to repair the inconsistency.
2224          *
2225          * 1) create the MDT-object locally.
2226          * 2) update the OST-object's PFID EA if necessary.
2227          *
2228          * If 1) succeed, but 2) failed, then the OST-object's PFID EA will be
2229          * updated when the layout LFSCK run next time.
2230          *
2231          * If 1) failed, but 2) succeed, then such MDT-object will be re-created
2232          * when the layout LFSCK run next time. */
2233
2234         if (fid_is_zero(pfid)) {
2235                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
2236                 if (rc != 0)
2237                         GOTO(log, rc);
2238
2239                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
2240                 if (IS_ERR(cobj))
2241                         GOTO(log, rc = PTR_ERR(cobj));
2242         }
2243
2244         pobj = lfsck_object_find_by_dev(env, dev, pfid);
2245         if (IS_ERR(pobj))
2246                 GOTO(log, rc = PTR_ERR(pobj));
2247
2248         LASSERT(infix != NULL);
2249         LASSERT(type != NULL);
2250
2251         memset(la, 0, sizeof(*la));
2252         la->la_uid = rec->lor_rec.lor_uid;
2253         la->la_gid = rec->lor_rec.lor_gid;
2254         la->la_mode = S_IFREG | S_IRUSR;
2255         la->la_valid = LA_MODE | LA_UID | LA_GID;
2256
2257         memset(dof, 0, sizeof(*dof));
2258         dof->dof_type = dt_mode_to_dft(S_IFREG);
2259         /* Because the dof->dof_reg.striped = 0, the LOD will not create
2260          * the stripe(s). The LFSCK will specify the LOV EA via
2261          * lfsck_layout_update_lovea(). */
2262
2263         size = lfsck_lovea_size(ol, ea_off);
2264         if (ea_buf->lb_len < size) {
2265                 lu_buf_realloc(ea_buf, size);
2266                 if (ea_buf->lb_buf == NULL)
2267                         GOTO(log, rc = -ENOMEM);
2268         }
2269
2270 again:
2271         do {
2272                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
2273                          type, idx++);
2274                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2275                                (const struct dt_key *)name);
2276                 if (rc != 0 && rc != -ENOENT)
2277                         GOTO(log, rc);
2278         } while (rc == 0);
2279
2280         rc = lfsck_lock(env, lfsck, lfsck->li_lpf_obj, name, llh,
2281                         MDS_INODELOCK_UPDATE, LCK_PW);
2282         if (rc != 0)
2283                 GOTO(log, rc);
2284
2285         /* Re-check whether the name conflict with othrs after taken
2286          * the ldlm lock. */
2287         rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2288                        (const struct dt_key *)name);
2289         if (unlikely(rc == 0)) {
2290                 lfsck_unlock(llh);
2291                 goto again;
2292         }
2293
2294         if (rc != -ENOENT)
2295                 GOTO(unlock, rc);
2296
2297         pname = lfsck_name_get_const(env, name, strlen(name));
2298         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf,
2299                               pname, lfsck_dto2fid(lfsck->li_lpf_obj));
2300         if (rc != 0)
2301                 GOTO(unlock, rc);
2302
2303         /* The 1st transaction. */
2304         th = dt_trans_create(env, dev);
2305         if (IS_ERR(th))
2306                 GOTO(unlock, rc = PTR_ERR(th));
2307
2308         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2309         if (rc != 0)
2310                 GOTO(stop, rc);
2311
2312         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
2313         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
2314                                   LU_XATTR_CREATE, th);
2315         if (rc != 0)
2316                 GOTO(stop, rc);
2317
2318         dtrec->rec_fid = pfid;
2319         dtrec->rec_type = S_IFREG;
2320         rc = dt_declare_insert(env, lpf,
2321                                (const struct dt_rec *)dtrec,
2322                                (const struct dt_key *)name, th);
2323         if (rc != 0)
2324                 GOTO(stop, rc);
2325
2326         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2327                        ldata.ld_leh->leh_len);
2328         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
2329                                   XATTR_NAME_LINK, 0, th);
2330         if (rc != 0)
2331                 GOTO(stop, rc);
2332
2333         rc = dt_trans_start_local(env, dev, th);
2334         if (rc != 0)
2335                 GOTO(stop, rc);
2336
2337         dt_write_lock(env, pobj, 0);
2338         rc = dt_create(env, pobj, la, NULL, dof, th);
2339         if (rc == 0)
2340                 rc = lfsck_layout_update_lovea(env, lfsck, th, ol, pobj, cfid,
2341                         &lov_buf, LU_XATTR_CREATE, ltd->ltd_index, ea_off);
2342         dt_write_unlock(env, pobj);
2343         if (rc < 0)
2344                 GOTO(stop, rc);
2345
2346         rc = dt_insert(env, lpf, (const struct dt_rec *)dtrec,
2347                        (const struct dt_key *)name, th, 1);
2348         if (rc != 0)
2349                 GOTO(stop, rc);
2350
2351         rc = dt_xattr_set(env, pobj, &linkea_buf, XATTR_NAME_LINK, 0, th);
2352         if (rc == 0 && cobj != NULL) {
2353                 dt_trans_stop(env, dev, th);
2354                 th = NULL;
2355
2356                 /* The 2nd transaction. */
2357                 rc = __lfsck_layout_update_pfid(env, cobj, pfid, ol, ea_off);
2358         }
2359
2360         GOTO(stop, rc);
2361
2362 stop:
2363         if (th != NULL)
2364                 dt_trans_stop(env, dev, th);
2365
2366 unlock:
2367         lfsck_unlock(llh);
2368
2369 log:
2370         if (cobj != NULL && !IS_ERR(cobj))
2371                 lfsck_object_put(env, cobj);
2372         if (pobj != NULL && !IS_ERR(pobj))
2373                 lfsck_object_put(env, pobj);
2374
2375         if (rc < 0)
2376                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2377                        "recreate the lost MDT-object: parent "DFID
2378                        ", child "DFID", OST-index %u, stripe-index %u, "
2379                        "infix %s, type %s: rc = %d\n",
2380                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2381                        ltd->ltd_index, ea_off, infix, type, rc);
2382
2383         return rc >= 0 ? 1 : rc;
2384 }
2385
2386 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2387                                                    struct lfsck_component *com,
2388                                                    const struct lu_fid *fid,
2389                                                    __u32 index)
2390 {
2391         struct lfsck_thread_info *info  = lfsck_env_info(env);
2392         struct lfsck_request     *lr    = &info->lti_lr;
2393         struct lfsck_instance    *lfsck = com->lc_lfsck;
2394         struct lfsck_tgt_desc    *ltd;
2395         struct ptlrpc_request    *req;
2396         struct lfsck_request     *tmp;
2397         struct obd_export        *exp;
2398         int                       rc    = 0;
2399         ENTRY;
2400
2401         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2402         if (unlikely(ltd == NULL))
2403                 RETURN(-ENXIO);
2404
2405         exp = ltd->ltd_exp;
2406         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2407                 GOTO(put, rc = -EOPNOTSUPP);
2408
2409         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2410         if (req == NULL)
2411                 GOTO(put, rc = -ENOMEM);
2412
2413         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2414         if (rc != 0) {
2415                 ptlrpc_request_free(req);
2416
2417                 GOTO(put, rc);
2418         }
2419
2420         memset(lr, 0, sizeof(*lr));
2421         lr->lr_event = LE_CONDITIONAL_DESTROY;
2422         lr->lr_active = LFSCK_TYPE_LAYOUT;
2423         lr->lr_fid = *fid;
2424
2425         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2426         *tmp = *lr;
2427         ptlrpc_request_set_replen(req);
2428
2429         rc = ptlrpc_queue_wait(req);
2430         ptlrpc_req_finished(req);
2431
2432         GOTO(put, rc);
2433
2434 put:
2435         lfsck_tgt_put(ltd);
2436
2437         return rc;
2438 }
2439
2440 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2441                                                   struct lfsck_component *com,
2442                                                   struct lfsck_request *lr)
2443 {
2444         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2445         struct lu_attr                  *la     = &info->lti_la;
2446         union ldlm_policy_data          *policy = &info->lti_policy;
2447         struct ldlm_res_id              *resid  = &info->lti_resid;
2448         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2449         struct dt_device                *dev    = lfsck->li_bottom;
2450         struct lu_fid                   *fid    = &lr->lr_fid;
2451         struct dt_object                *obj;
2452         struct thandle                  *th     = NULL;
2453         struct lustre_handle             lh     = { 0 };
2454         __u64                            flags  = 0;
2455         int                              rc     = 0;
2456         ENTRY;
2457
2458         obj = lfsck_object_find_by_dev(env, dev, fid);
2459         if (IS_ERR(obj))
2460                 RETURN(PTR_ERR(obj));
2461
2462         dt_read_lock(env, obj, 0);
2463         if (dt_object_exists(obj) == 0 ||
2464             lfsck_is_dead_obj(obj)) {
2465                 dt_read_unlock(env, obj);
2466
2467                 GOTO(put, rc = -ENOENT);
2468         }
2469
2470         /* Get obj's attr without lock firstly. */
2471         rc = dt_attr_get(env, obj, la);
2472         dt_read_unlock(env, obj);
2473         if (rc != 0)
2474                 GOTO(put, rc);
2475
2476         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2477                 GOTO(put, rc = -ETXTBSY);
2478
2479         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2480         LASSERT(lfsck->li_namespace != NULL);
2481
2482         memset(policy, 0, sizeof(*policy));
2483         policy->l_extent.end = OBD_OBJECT_EOF;
2484         ost_fid_build_resid(fid, resid);
2485         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2486                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2487                                     ldlm_completion_ast, NULL, NULL, 0,
2488                                     LVB_T_NONE, NULL, &lh);
2489         if (rc != ELDLM_OK)
2490                 GOTO(put, rc = -EIO);
2491
2492         dt_write_lock(env, obj, 0);
2493         /* Get obj's attr within lock again. */
2494         rc = dt_attr_get(env, obj, la);
2495         if (rc != 0)
2496                 GOTO(unlock, rc);
2497
2498         if (la->la_ctime != 0)
2499                 GOTO(unlock, rc = -ETXTBSY);
2500
2501         th = dt_trans_create(env, dev);
2502         if (IS_ERR(th))
2503                 GOTO(unlock, rc = PTR_ERR(th));
2504
2505         rc = dt_declare_ref_del(env, obj, th);
2506         if (rc != 0)
2507                 GOTO(stop, rc);
2508
2509         rc = dt_declare_destroy(env, obj, th);
2510         if (rc != 0)
2511                 GOTO(stop, rc);
2512
2513         rc = dt_trans_start_local(env, dev, th);
2514         if (rc != 0)
2515                 GOTO(stop, rc);
2516
2517         rc = dt_ref_del(env, obj, th);
2518         if (rc != 0)
2519                 GOTO(stop, rc);
2520
2521         rc = dt_destroy(env, obj, th);
2522         if (rc == 0)
2523                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2524                        "OST-object "DFID" that was created for reparing "
2525                        "dangling referenced case. But the original missing "
2526                        "OST-object is found now.\n",
2527                        lfsck_lfsck2name(lfsck), PFID(fid));
2528
2529         GOTO(stop, rc);
2530
2531 stop:
2532         dt_trans_stop(env, dev, th);
2533
2534 unlock:
2535         dt_write_unlock(env, obj);
2536         ldlm_lock_decref(&lh, LCK_EX);
2537
2538 put:
2539         lfsck_object_put(env, obj);
2540
2541         return rc;
2542 }
2543
2544 /**
2545  * Some OST-object has occupied the specified layout EA slot.
2546  * Such OST-object may be generated by the LFSCK when repair
2547  * dangling referenced MDT-object, which can be indicated by
2548  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2549  * is true and such OST-object has not been modified yet, we
2550  * will replace it with the orphan OST-object; otherwise the
2551  * LFSCK will create new MDT-object to reference the orphan.
2552  *
2553  * \retval       +1: repaired
2554  * \retval        0: did nothing
2555  * \retval      -ve: on error
2556  */
2557 static int lfsck_layout_conflict_create(const struct lu_env *env,
2558                                         struct lfsck_component *com,
2559                                         struct lfsck_tgt_desc *ltd,
2560                                         struct lu_orphan_rec_v2 *rec,
2561                                         struct dt_object *parent,
2562                                         struct lu_fid *cfid,
2563                                         struct lu_buf *ea_buf,
2564                                         struct lov_mds_md_v1 *lmm,
2565                                         struct lov_ost_data_v1 *slot,
2566                                         __u32 ea_off, int lovea_size)
2567 {
2568         struct lfsck_thread_info *info          = lfsck_env_info(env);
2569         struct lu_fid            *cfid2         = &info->lti_fid2;
2570         struct ost_id            *oi            = &info->lti_oi;
2571         struct dt_device         *dev           = lfsck_obj2dev(parent);
2572         struct thandle           *th            = NULL;
2573         struct lustre_handle      lh            = { 0 };
2574         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2575         int                       rc            = 0;
2576         ENTRY;
2577
2578         while (CFS_FAIL_TIMEOUT(OBD_FAIL_LFSCK_DELAY3, cfs_fail_val)) {
2579                 if (unlikely(!thread_is_running(&com->lc_lfsck->li_thread)))
2580                         RETURN(0);
2581         }
2582
2583         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2584         rc = ostid_to_fid(cfid2, oi, ost_idx2);
2585         if (rc != 0)
2586                 GOTO(out, rc);
2587
2588         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2589                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2590                               LCK_EX);
2591         if (rc != 0)
2592                 GOTO(out, rc);
2593
2594         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2595
2596         /* If the conflict OST-obejct is not created for fixing dangling
2597          * referenced MDT-object in former LFSCK check/repair, or it has
2598          * been modified by others, then we cannot destroy it. Re-create
2599          * a new MDT-object for the orphan OST-object. */
2600         if (rc == -ETXTBSY) {
2601                 /* No need the layout lock on the original parent. */
2602                 lfsck_ibits_unlock(&lh, LCK_EX);
2603
2604                 fid_zero(&rec->lor_rec.lor_fid);
2605                 snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf),
2606                          "-"DFID"-%x", PFID(lu_object_fid(&parent->do_lu)),
2607                          ea_off);
2608                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2609                                                 info->lti_tmpbuf, "C", ea_off);
2610
2611                 RETURN(rc);
2612         }
2613
2614         if (rc != 0 && rc != -ENOENT)
2615                 GOTO(unlock, rc);
2616
2617         th = dt_trans_create(env, dev);
2618         if (IS_ERR(th))
2619                 GOTO(unlock, rc = PTR_ERR(th));
2620
2621         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2622                                   LU_XATTR_REPLACE, th);
2623         if (rc != 0)
2624                 GOTO(stop, rc);
2625
2626         rc = dt_trans_start_local(env, dev, th);
2627         if (rc != 0)
2628                 GOTO(stop, rc);
2629
2630         dt_write_lock(env, parent, 0);
2631         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2632         rc = lfsck_layout_refill_lovea(env, com->lc_lfsck, th, parent, cfid,
2633                                        ea_buf, lmm, slot, LU_XATTR_REPLACE,
2634                                        ltd->ltd_index, lovea_size);
2635         dt_write_unlock(env, parent);
2636
2637         GOTO(stop, rc);
2638
2639 stop:
2640         dt_trans_stop(env, dev, th);
2641
2642 unlock:
2643         lfsck_ibits_unlock(&lh, LCK_EX);
2644
2645 out:
2646         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2647                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2648                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2649                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2650                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2651                ea_off, rc);
2652
2653         return rc >= 0 ? 1 : rc;
2654 }
2655
2656 /**
2657  * \retval       +1: repaired
2658  * \retval        0: did nothing
2659  * \retval      -ve: on error
2660  */
2661 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2662                                        struct lfsck_component *com,
2663                                        struct lfsck_tgt_desc *ltd,
2664                                        struct lu_orphan_rec_v2 *rec,
2665                                        struct dt_object *parent,
2666                                        struct lu_fid *cfid,
2667                                        __u32 ost_idx, __u32 ea_off)
2668 {
2669         struct lfsck_thread_info *info          = lfsck_env_info(env);
2670         struct lu_buf            *buf           = &info->lti_big_buf;
2671         struct lu_fid            *fid           = &info->lti_fid2;
2672         struct ost_id            *oi            = &info->lti_oi;
2673         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2674         struct dt_device         *dt            = lfsck_obj2dev(parent);
2675         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2676         struct ost_layout        *ol            = &rec->lor_layout;
2677         struct lov_comp_md_v1    *lcm           = NULL;
2678         struct lov_comp_md_entry_v1 *lcme       = NULL;
2679         struct thandle           *handle        = NULL;
2680         size_t                    lovea_size;
2681         struct lov_mds_md_v1     *lmm;
2682         struct lov_ost_data_v1   *objs;
2683         struct lustre_handle      lh            = { 0 };
2684         __u32                     magic;
2685         __u32 flags = 0;
2686         int                       fl            = 0;
2687         int                       rc            = 0;
2688         int                       rc1;
2689         int                       i;
2690         __u16                     count;
2691         bool                      locked        = false;
2692         ENTRY;
2693
2694         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2695                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2696                               LCK_EX);
2697         if (rc != 0) {
2698                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2699                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2700                        "stripe-index %u, comp_id %u, comp_start %llu, "
2701                        "comp_end %llu: rc = %d\n",
2702                        lfsck_lfsck2name(lfsck), PFID(cfid),
2703                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off,
2704                        ol->ol_comp_id, ol->ol_comp_start,
2705                        ol->ol_comp_end, rc);
2706
2707                 RETURN(rc);
2708         }
2709
2710 again:
2711         if (locked) {
2712                 dt_write_unlock(env, parent);
2713                 locked = false;
2714         }
2715
2716         if (handle != NULL) {
2717                 dt_trans_stop(env, dt, handle);
2718                 handle = NULL;
2719         }
2720
2721         if (rc < 0)
2722                 GOTO(unlock_layout, rc);
2723
2724         lovea_size = rc;
2725         if (buf->lb_len < lovea_size) {
2726                 lu_buf_realloc(buf, lovea_size);
2727                 if (buf->lb_buf == NULL)
2728                         GOTO(unlock_layout, rc = -ENOMEM);
2729         }
2730
2731         if (!(bk->lb_param & LPF_DRYRUN)) {
2732                 handle = dt_trans_create(env, dt);
2733                 if (IS_ERR(handle))
2734                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2735
2736                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2737                                           fl, handle);
2738                 if (rc != 0)
2739                         GOTO(stop, rc);
2740
2741                 rc = dt_trans_start_local(env, dt, handle);
2742                 if (rc != 0)
2743                         GOTO(stop, rc);
2744         }
2745
2746         dt_write_lock(env, parent, 0);
2747         locked = true;
2748         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV);
2749         if (rc == -ERANGE) {
2750                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV);
2751                 LASSERT(rc != 0);
2752                 goto again;
2753         } else if (rc == -ENODATA || rc == 0) {
2754                 lovea_size = lfsck_lovea_size(ol, ea_off);
2755                 /* If the declared is not big enough, re-try. */
2756                 if (buf->lb_len < lovea_size) {
2757                         rc = lovea_size;
2758                         goto again;
2759                 }
2760                 fl = LU_XATTR_CREATE;
2761         } else if (rc < 0) {
2762                 GOTO(unlock_parent, rc);
2763         } else if (unlikely(buf->lb_len == 0)) {
2764                 goto again;
2765         } else {
2766                 fl = LU_XATTR_REPLACE;
2767                 lovea_size = rc;
2768         }
2769
2770         if (fl == LU_XATTR_CREATE) {
2771                 if (bk->lb_param & LPF_DRYRUN)
2772                         GOTO(unlock_parent, rc = 1);
2773
2774                 LASSERT(buf->lb_len >= lovea_size);
2775
2776                 rc = lfsck_layout_update_lovea(env, lfsck, handle, ol, parent,
2777                                                cfid, buf, fl, ost_idx, ea_off);
2778
2779                 GOTO(unlock_parent, rc);
2780         }
2781
2782         lmm = buf->lb_buf;
2783         rc1 = lfsck_layout_verify_header(parent, lmm);
2784
2785         /* If the LOV EA crashed, the rebuild it. */
2786         if (rc1 == -EINVAL) {
2787                 if (bk->lb_param & LPF_DRYRUN)
2788                         GOTO(unlock_parent, rc = 1);
2789
2790                 LASSERT(buf->lb_len >= lovea_size);
2791
2792                 rc = lfsck_layout_update_lovea(env, lfsck, handle, ol, parent,
2793                                                cfid, buf, fl, ost_idx, ea_off);
2794
2795                 GOTO(unlock_parent, rc);
2796         }
2797
2798         /* For other unknown magic/pattern, keep the current LOV EA. */
2799         if (rc1 != 0)
2800                 GOTO(unlock_parent, rc = rc1);
2801
2802         magic = le32_to_cpu(lmm->lmm_magic);
2803         if (magic == LOV_MAGIC_COMP_V1) {
2804                 __u64 start;
2805                 __u64 end;
2806
2807                 lcm = buf->lb_buf;
2808                 count = le16_to_cpu(lcm->lcm_entry_count);
2809                 for (i = 0; i < count; i++) {
2810                         lcme = &lcm->lcm_entries[i];
2811                         start = le64_to_cpu(lcme->lcme_extent.e_start);
2812                         end = le64_to_cpu(lcme->lcme_extent.e_end);
2813
2814                         if (end <= ol->ol_comp_start)
2815                                 continue;
2816
2817                         if (start >= ol->ol_comp_end)
2818                                 break;
2819
2820                         lmm = buf->lb_buf + le32_to_cpu(lcme->lcme_offset);
2821                         magic = le32_to_cpu(lmm->lmm_magic);
2822                         flags = le32_to_cpu(lcme->lcme_flags);
2823                         goto further;
2824                 }
2825
2826                 rc = lfsck_layout_add_comp_comp(env, lfsck, handle, ol, parent,
2827                                                cfid, buf, ost_idx, ea_off, i);
2828
2829                 GOTO(unlock_parent, rc);
2830         }
2831
2832 further:
2833         count = le16_to_cpu(lmm->lmm_stripe_count);
2834         if (count == 0)
2835                 GOTO(unlock_parent, rc = -EINVAL);
2836         LASSERT(count > 0);
2837
2838         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2839         if (count <= ea_off) {
2840                 if (bk->lb_param & LPF_DRYRUN)
2841                         GOTO(unlock_parent, rc = 1);
2842
2843                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2844                 /* If the declared is not big enough, re-try. */
2845                 if (buf->lb_len < lovea_size) {
2846                         rc = lovea_size;
2847                         goto again;
2848                 }
2849
2850                 if (lcme && !(flags & LCME_FL_INIT))
2851                         lcme->lcme_flags = cpu_to_le32(flags | LCME_FL_INIT);
2852
2853                 rc = lfsck_layout_extend_v1v3_lovea(env, lfsck, handle, ol,
2854                                         parent, cfid, buf, ost_idx, ea_off);
2855
2856                 GOTO(unlock_parent, rc);
2857         }
2858
2859         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2860
2861         if (magic == LOV_MAGIC_V1) {
2862                 objs = &lmm->lmm_objects[0];
2863         } else {
2864                 LASSERT(magic == LOV_MAGIC_V3);
2865                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2866         }
2867
2868         for (i = 0; i < count; i++, objs++) {
2869                 /* The MDT-object was created via lfsck_layout_recover_create()
2870                  * by others before, and we fill the dummy layout EA. */
2871                 if ((lcme && !(flags & LCME_FL_INIT)) ||
2872                      lovea_slot_is_dummy(objs)) {
2873                         if (i != ea_off)
2874                                 continue;
2875
2876                         if (bk->lb_param & LPF_DRYRUN)
2877                                 GOTO(unlock_parent, rc = 1);
2878
2879                         lmm->lmm_layout_gen =
2880                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2881                         if (lcme) {
2882                                 LASSERT(lcm);
2883
2884                                 if (le32_to_cpu(lmm->lmm_stripe_size) !=
2885                                         ol->ol_stripe_size ||
2886                                     le16_to_cpu(lmm->lmm_stripe_count) !=
2887                                         ol->ol_stripe_count ||
2888                                     le64_to_cpu(lcme->lcme_extent.e_start) !=
2889                                         ol->ol_comp_start ||
2890                                     le64_to_cpu(lcme->lcme_extent.e_end) !=
2891                                         ol->ol_comp_end) {
2892                                         CDEBUG(D_LFSCK, "%s: found invalid "
2893                                         "component for "DFID ": parent "DFID
2894                                         ", stripe-index %u, stripe_size %u, "
2895                                         "stripe_count %u, comp_id %u, "
2896                                         "comp_start %llu, comp_end %llu, "
2897                                         "cur_stripe_size %u, "
2898                                         "cur_stripe_count %u, "
2899                                         "cur_comp_start %llu, "
2900                                         "cur_comp_end %llu\n",
2901                                         lfsck_lfsck2name(lfsck), PFID(cfid),
2902                                         PFID(lfsck_dto2fid(parent)), ea_off,
2903                                         ol->ol_stripe_size,
2904                                         ol->ol_stripe_count, ol->ol_comp_id,
2905                                         ol->ol_comp_start, ol->ol_comp_end,
2906                                         le32_to_cpu(lmm->lmm_stripe_size),
2907                                         le16_to_cpu(lmm->lmm_stripe_count),
2908                                         le64_to_cpu(lcme->lcme_extent.e_start),
2909                                         le64_to_cpu(lcme->lcme_extent.e_end));
2910
2911                                         GOTO(unlock_parent, rc = -EINVAL);
2912                                 }
2913
2914                                 le32_add_cpu(&lcm->lcm_layout_gen, 1);
2915                                 lovea_size = le32_to_cpu(lcm->lcm_size);
2916                                 if (!(flags & LCME_FL_INIT))
2917                                         lcme->lcme_flags = cpu_to_le32(flags |
2918                                                                 LCME_FL_INIT);
2919                         }
2920
2921                         LASSERTF(buf->lb_len >= lovea_size,
2922                                  "buffer len %d is less than real size %d\n",
2923                                  (int)buf->lb_len, (int)lovea_size);
2924
2925                         rc = lfsck_layout_refill_lovea(env, lfsck, handle,
2926                                                 parent, cfid, buf, lmm, objs,
2927                                                 fl, ost_idx, lovea_size);
2928
2929                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2930                                "dummy layout slot for "DFID": parent "DFID
2931                                ", OST-index %u, stripe-index %u: rc = %d\n",
2932                                lfsck_lfsck2name(lfsck), PFID(cfid),
2933                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2934
2935                         GOTO(unlock_parent, rc);
2936                 }
2937
2938                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2939                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2940                 if (rc != 0) {
2941                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2942                                "invalid layout EA at the slot %d, index %u\n",
2943                                lfsck_lfsck2name(lfsck),
2944                                PFID(lfsck_dto2fid(parent)), i,
2945                                le32_to_cpu(objs->l_ost_idx));
2946
2947                         GOTO(unlock_parent, rc);
2948                 }
2949
2950                 /* It should be rare case, the slot is there, but the LFSCK
2951                  * does not handle it during the first-phase cycle scanning. */
2952                 if (unlikely(lu_fid_eq(fid, cfid))) {
2953                         if (i == ea_off) {
2954                                 GOTO(unlock_parent, rc = 0);
2955                         } else {
2956                                 /* Rare case that the OST-object index
2957                                  * does not match the parent MDT-object
2958                                  * layout EA. We trust the later one. */
2959                                 if (bk->lb_param & LPF_DRYRUN)
2960                                         GOTO(unlock_parent, rc = 1);
2961
2962                                 dt_write_unlock(env, parent);
2963                                 if (handle != NULL)
2964                                         dt_trans_stop(env, dt, handle);
2965                                 lfsck_ibits_unlock(&lh, LCK_EX);
2966                                 rc = lfsck_layout_update_pfid(env, com, parent,
2967                                                         cfid, ltd->ltd_tgt,
2968                                                         ol, i);
2969
2970                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2971                                        "updated OST-object's pfid for "DFID
2972                                        ": parent "DFID", OST-index %u, "
2973                                        "stripe-index %u: rc = %d\n",
2974                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2975                                        PFID(lfsck_dto2fid(parent)),
2976                                        ltd->ltd_index, i, rc);
2977
2978                                 RETURN(rc);
2979                         }
2980                 }
2981         }
2982
2983         /* The MDT-object exists, but related layout EA slot is occupied
2984          * by others. */
2985         if (bk->lb_param & LPF_DRYRUN)
2986                 GOTO(unlock_parent, rc = 1);
2987
2988         dt_write_unlock(env, parent);
2989         if (handle != NULL)
2990                 dt_trans_stop(env, dt, handle);
2991         lfsck_ibits_unlock(&lh, LCK_EX);
2992         if (magic == LOV_MAGIC_V1)
2993                 objs = &lmm->lmm_objects[ea_off];
2994         else
2995                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2996         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2997                                           buf, lmm, objs, ea_off, lovea_size);
2998
2999         RETURN(rc);
3000
3001 unlock_parent:
3002         if (locked)
3003                 dt_write_unlock(env, parent);
3004
3005 stop:
3006         if (handle != NULL)
3007                 dt_trans_stop(env, dt, handle);
3008
3009 unlock_layout:
3010         lfsck_ibits_unlock(&lh, LCK_EX);
3011
3012         return rc;
3013 }
3014
3015 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
3016                                         struct lfsck_component *com,
3017                                         struct lfsck_tgt_desc *ltd,
3018                                         struct lu_orphan_rec_v2 *rec,
3019                                         struct lu_fid *cfid)
3020 {
3021         struct lfsck_layout     *lo     = com->lc_file_ram;
3022         struct lu_fid           *pfid   = &rec->lor_rec.lor_fid;
3023         struct dt_object        *parent = NULL;
3024         __u32                    ea_off = pfid->f_stripe_idx;
3025         int                      rc     = 0;
3026         ENTRY;
3027
3028         if (!fid_is_sane(cfid))
3029                 GOTO(out, rc = -EINVAL);
3030
3031         pfid->f_ver = 0;
3032         if (fid_is_zero(pfid)) {
3033                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
3034                                                   "", "N", ea_off);
3035                 GOTO(out, rc);
3036         }
3037
3038         if (!fid_is_sane(pfid))
3039                 GOTO(out, rc = -EINVAL);
3040
3041         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
3042         if (IS_ERR(parent))
3043                 GOTO(out, rc = PTR_ERR(parent));
3044
3045         if (unlikely(dt_object_remote(parent) != 0))
3046                 GOTO(put, rc = -EXDEV);
3047
3048         if (dt_object_exists(parent) == 0) {
3049                 lfsck_object_put(env, parent);
3050                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
3051                                                   "", "R", ea_off);
3052                 GOTO(out, rc);
3053         }
3054
3055         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
3056                 GOTO(put, rc = -EISDIR);
3057
3058         /* The orphan OST-object claims to be the parent's stripe, then
3059          * related dangling record in the trace file is meaningless. */
3060         rc = lfsck_layout_del_dangling_rec(env, com, pfid,
3061                                            rec->lor_layout.ol_comp_id, ea_off);
3062         if (rc && rc != -ENOENT)
3063                 GOTO(put, rc);
3064
3065         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
3066                                          ltd->ltd_index, ea_off);
3067
3068         GOTO(put, rc);
3069
3070 put:
3071         if (rc <= 0)
3072                 lfsck_object_put(env, parent);
3073         else
3074                 /* The layout EA is changed, need to be reloaded next time. */
3075                 dt_object_put_nocache(env, parent);
3076
3077 out:
3078         down_write(&com->lc_sem);
3079         com->lc_new_scanned++;
3080         com->lc_new_checked++;
3081         if (rc > 0) {
3082                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
3083                 rc = 0;
3084         } else if (rc < 0) {
3085                 lo->ll_objs_failed_phase2++;
3086         }
3087         up_write(&com->lc_sem);
3088
3089         return rc;
3090 }
3091
3092 static int lfsck_layout_scan_orphan(const struct lu_env *env,
3093                                     struct lfsck_component *com,
3094                                     struct lfsck_tgt_desc *ltd)
3095 {
3096         struct lfsck_assistant_data     *lad    = com->lc_data;
3097         struct lfsck_instance           *lfsck  = com->lc_lfsck;