Whamcloud - gitweb
aadf08e5acf6d9de5cd12471e2b8c47931937579
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         /* The position for next record in the rbtree for iteration. */
68         struct lu_fid           llst_fid;
69         /* Dummy hash for iteration against the rbtree. */
70         __u64                   llst_hash;
71         __u64                   llst_gen;
72         atomic_t                llst_ref;
73         __u32                   llst_index;
74 };
75
76 struct lfsck_layout_slave_data {
77         /* list for lfsck_layout_seq */
78         struct list_head         llsd_seq_list;
79
80         /* list for the masters involve layout verification. */
81         struct list_head         llsd_master_list;
82         spinlock_t               llsd_lock;
83         __u64                    llsd_touch_gen;
84         struct dt_object        *llsd_rb_obj;
85         struct rb_root           llsd_rb_root;
86         rwlock_t                 llsd_rb_lock;
87         unsigned int             llsd_rbtree_valid:1;
88 };
89
90 struct lfsck_layout_object {
91         struct dt_object        *llo_obj;
92         struct lu_attr           llo_attr;
93         atomic_t                 llo_ref;
94         __u64                    llo_cookie;
95         __u16                    llo_gen;
96 };
97
98 struct lfsck_layout_req {
99         struct lfsck_assistant_req       llr_lar;
100         struct lfsck_layout_object      *llr_parent;
101         struct dt_object                *llr_child;
102         __u32                            llr_ost_idx;
103         __u32                            llr_lov_idx; /* offset in LOV EA */
104 };
105
106 struct lfsck_layout_slave_async_args {
107         struct obd_export                *llsaa_exp;
108         struct lfsck_component           *llsaa_com;
109         struct lfsck_layout_slave_target *llsaa_llst;
110 };
111
112 static struct lfsck_layout_object *
113 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
114                          __u64 cookie, __u16 gen)
115 {
116         struct lfsck_layout_object *llo;
117         int                         rc;
118
119         OBD_ALLOC_PTR(llo);
120         if (llo == NULL)
121                 return ERR_PTR(-ENOMEM);
122
123         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
124         if (rc != 0) {
125                 OBD_FREE_PTR(llo);
126
127                 return ERR_PTR(rc);
128         }
129
130         lu_object_get(&obj->do_lu);
131         llo->llo_obj = obj;
132         llo->llo_cookie = cookie;
133         /* The gen can be used to check whether some others have changed the
134          * file layout after LFSCK pre-fetching but before real verification. */
135         llo->llo_gen = gen;
136         atomic_set(&llo->llo_ref, 1);
137
138         return llo;
139 }
140
141 static inline void
142 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
143 {
144         if (atomic_dec_and_test(&llst->llst_ref)) {
145                 LASSERT(list_empty(&llst->llst_list));
146
147                 OBD_FREE_PTR(llst);
148         }
149 }
150
151 static inline int
152 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
153 {
154         struct lfsck_layout_slave_target *llst;
155         struct lfsck_layout_slave_target *tmp;
156         int                               rc   = 0;
157
158         OBD_ALLOC_PTR(llst);
159         if (llst == NULL)
160                 return -ENOMEM;
161
162         INIT_LIST_HEAD(&llst->llst_list);
163         llst->llst_gen = 0;
164         llst->llst_index = index;
165         atomic_set(&llst->llst_ref, 1);
166
167         spin_lock(&llsd->llsd_lock);
168         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
169                 if (tmp->llst_index == index) {
170                         rc = -EALREADY;
171                         break;
172                 }
173         }
174         if (rc == 0)
175                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
176         spin_unlock(&llsd->llsd_lock);
177
178         if (rc != 0)
179                 OBD_FREE_PTR(llst);
180
181         return rc;
182 }
183
184 static inline void
185 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
186                       struct lfsck_layout_slave_target *llst)
187 {
188         bool del = false;
189
190         spin_lock(&llsd->llsd_lock);
191         if (!list_empty(&llst->llst_list)) {
192                 list_del_init(&llst->llst_list);
193                 del = true;
194         }
195         spin_unlock(&llsd->llsd_lock);
196
197         if (del)
198                 lfsck_layout_llst_put(llst);
199 }
200
201 static inline struct lfsck_layout_slave_target *
202 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
203                                __u32 index, bool unlink)
204 {
205         struct lfsck_layout_slave_target *llst;
206
207         spin_lock(&llsd->llsd_lock);
208         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
209                 if (llst->llst_index == index) {
210                         if (unlink)
211                                 list_del_init(&llst->llst_list);
212                         else
213                                 atomic_inc(&llst->llst_ref);
214                         spin_unlock(&llsd->llsd_lock);
215
216                         return llst;
217                 }
218         }
219         spin_unlock(&llsd->llsd_lock);
220
221         return NULL;
222 }
223
224 static inline void lfsck_layout_object_put(const struct lu_env *env,
225                                            struct lfsck_layout_object *llo)
226 {
227         if (atomic_dec_and_test(&llo->llo_ref)) {
228                 lfsck_object_put(env, llo->llo_obj);
229                 OBD_FREE_PTR(llo);
230         }
231 }
232
233 static struct lfsck_layout_req *
234 lfsck_layout_assistant_req_init(struct lfsck_layout_object *parent,
235                                 struct dt_object *child, __u32 ost_idx,
236                                 __u32 lov_idx)
237 {
238         struct lfsck_layout_req *llr;
239
240         OBD_ALLOC_PTR(llr);
241         if (llr == NULL)
242                 return ERR_PTR(-ENOMEM);
243
244         INIT_LIST_HEAD(&llr->llr_lar.lar_list);
245         atomic_inc(&parent->llo_ref);
246         llr->llr_parent = parent;
247         llr->llr_child = child;
248         llr->llr_ost_idx = ost_idx;
249         llr->llr_lov_idx = lov_idx;
250
251         return llr;
252 }
253
254 static void lfsck_layout_assistant_req_fini(const struct lu_env *env,
255                                             struct lfsck_assistant_req *lar)
256 {
257         struct lfsck_layout_req *llr =
258                         container_of0(lar, struct lfsck_layout_req, llr_lar);
259
260         lu_object_put(env, &llr->llr_child->do_lu);
261         lfsck_layout_object_put(env, llr->llr_parent);
262         OBD_FREE_PTR(llr);
263 }
264
265 static int lfsck_layout_get_lovea(const struct lu_env *env,
266                                   struct dt_object *obj, struct lu_buf *buf)
267 {
268         int rc;
269
270 again:
271         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
272         if (rc == -ERANGE) {
273                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
274                                   BYPASS_CAPA);
275                 if (rc <= 0)
276                         return rc;
277
278                 lu_buf_realloc(buf, rc);
279                 if (buf->lb_buf == NULL)
280                         return -ENOMEM;
281
282                 goto again;
283         }
284
285         if (rc == -ENODATA)
286                 rc = 0;
287
288         if (rc <= 0)
289                 return rc;
290
291         if (unlikely(buf->lb_buf == NULL)) {
292                 lu_buf_alloc(buf, rc);
293                 if (buf->lb_buf == NULL)
294                         return -ENOMEM;
295
296                 goto again;
297         }
298
299         return rc;
300 }
301
302 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
303 {
304         __u32 magic;
305         __u32 pattern;
306
307         magic = le32_to_cpu(lmm->lmm_magic);
308         /* If magic crashed, keep it there. Sometime later, during OST-object
309          * orphan handling, if some OST-object(s) back-point to it, it can be
310          * verified and repaired. */
311         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
312                 struct ost_id   oi;
313                 int             rc;
314
315                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
316                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
317                         rc = -EOPNOTSUPP;
318                 else
319                         rc = -EINVAL;
320
321                 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
322                        rc == -EINVAL ? "Unknown" : "Unsupported",
323                        magic, POSTID(&oi));
324
325                 return rc;
326         }
327
328         pattern = le32_to_cpu(lmm->lmm_pattern);
329         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
330         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
331                 struct ost_id oi;
332
333                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
334                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
335                        pattern, POSTID(&oi));
336
337                 return -EOPNOTSUPP;
338         }
339
340         return 0;
341 }
342
343 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
344 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
345 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
346
347 struct lfsck_rbtree_node {
348         struct rb_node   lrn_node;
349         __u64            lrn_seq;
350         __u32            lrn_first_oid;
351         atomic_t         lrn_known_count;
352         atomic_t         lrn_accessed_count;
353         void            *lrn_known_bitmap;
354         void            *lrn_accessed_bitmap;
355 };
356
357 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
358                                    __u64 seq, __u32 oid)
359 {
360         if (seq < lrn->lrn_seq)
361                 return -1;
362
363         if (seq > lrn->lrn_seq)
364                 return 1;
365
366         if (oid < lrn->lrn_first_oid)
367                 return -1;
368
369         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
370                 return 1;
371
372         return 0;
373 }
374
375 /* The caller should hold llsd->llsd_rb_lock. */
376 static struct lfsck_rbtree_node *
377 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
378                     const struct lu_fid *fid, bool *exact)
379 {
380         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
381         struct rb_node           *prev  = NULL;
382         struct lfsck_rbtree_node *lrn   = NULL;
383         int                       rc    = 0;
384
385         if (exact != NULL)
386                 *exact = true;
387
388         while (node != NULL) {
389                 prev = node;
390                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
391                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
392                 if (rc < 0)
393                         node = node->rb_left;
394                 else if (rc > 0)
395                         node = node->rb_right;
396                 else
397                         return lrn;
398         }
399
400         if (exact == NULL)
401                 return NULL;
402
403         /* If there is no exactly matched one, then to the next valid one. */
404         *exact = false;
405
406         /* The rbtree is empty. */
407         if (rc == 0)
408                 return NULL;
409
410         if (rc < 0)
411                 return lrn;
412
413         node = rb_next(prev);
414
415         /* The end of the rbtree. */
416         if (node == NULL)
417                 return NULL;
418
419         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
420
421         return lrn;
422 }
423
424 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
425                                                   const struct lu_fid *fid)
426 {
427         struct lfsck_rbtree_node *lrn;
428
429         OBD_ALLOC_PTR(lrn);
430         if (lrn == NULL)
431                 return ERR_PTR(-ENOMEM);
432
433         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
434         if (lrn->lrn_known_bitmap == NULL) {
435                 OBD_FREE_PTR(lrn);
436
437                 return ERR_PTR(-ENOMEM);
438         }
439
440         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
441         if (lrn->lrn_accessed_bitmap == NULL) {
442                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
443                 OBD_FREE_PTR(lrn);
444
445                 return ERR_PTR(-ENOMEM);
446         }
447
448         RB_CLEAR_NODE(&lrn->lrn_node);
449         lrn->lrn_seq = fid_seq(fid);
450         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
451         atomic_set(&lrn->lrn_known_count, 0);
452         atomic_set(&lrn->lrn_accessed_count, 0);
453
454         return lrn;
455 }
456
457 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
458 {
459         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
460         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
461         OBD_FREE_PTR(lrn);
462 }
463
464 /* The caller should hold lock. */
465 static struct lfsck_rbtree_node *
466 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
467                     struct lfsck_rbtree_node *lrn)
468 {
469         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
470         struct rb_node            *parent = NULL;
471         struct lfsck_rbtree_node  *tmp;
472         int                        rc;
473
474         while (*pos != NULL) {
475                 parent = *pos;
476                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
477                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
478                 if (rc < 0)
479                         pos = &(*pos)->rb_left;
480                 else if (rc > 0)
481                         pos = &(*pos)->rb_right;
482                 else
483                         return tmp;
484         }
485
486         rb_link_node(&lrn->lrn_node, parent, pos);
487         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
488
489         return lrn;
490 }
491
492 extern const struct dt_index_operations lfsck_orphan_index_ops;
493
494 static int lfsck_rbtree_setup(const struct lu_env *env,
495                               struct lfsck_component *com)
496 {
497         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
498         struct lfsck_instance           *lfsck  = com->lc_lfsck;
499         struct dt_device                *dev    = lfsck->li_bottom;
500         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
501         struct dt_object                *obj;
502
503         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
504         fid->f_oid = lfsck_dev_idx(dev);
505         fid->f_ver = 0;
506         obj = dt_locate(env, dev, fid);
507         if (IS_ERR(obj))
508                 RETURN(PTR_ERR(obj));
509
510         /* Generate an in-RAM object to stand for the layout rbtree.
511          * Scanning the layout rbtree will be via the iteration over
512          * the object. In the future, the rbtree may be written onto
513          * disk with the object.
514          *
515          * Mark the object to be as exist. */
516         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
517         obj->do_index_ops = &lfsck_orphan_index_ops;
518         llsd->llsd_rb_obj = obj;
519         llsd->llsd_rbtree_valid = 1;
520         dev->dd_record_fid_accessed = 1;
521
522         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
523                lfsck_lfsck2name(lfsck));
524
525         return 0;
526 }
527
528 static void lfsck_rbtree_cleanup(const struct lu_env *env,
529                                  struct lfsck_component *com)
530 {
531         struct lfsck_instance           *lfsck = com->lc_lfsck;
532         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
533         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
534         struct rb_node                  *next;
535         struct lfsck_rbtree_node        *lrn;
536
537         lfsck->li_bottom->dd_record_fid_accessed = 0;
538         /* Invalid the rbtree, then no others will use it. */
539         write_lock(&llsd->llsd_rb_lock);
540         llsd->llsd_rbtree_valid = 0;
541         write_unlock(&llsd->llsd_rb_lock);
542
543         while (node != NULL) {
544                 next = rb_next(node);
545                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
546                 rb_erase(node, &llsd->llsd_rb_root);
547                 lfsck_rbtree_free(lrn);
548                 node = next;
549         }
550
551         if (llsd->llsd_rb_obj != NULL) {
552                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
553                 llsd->llsd_rb_obj = NULL;
554         }
555
556         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
557                lfsck_lfsck2name(lfsck));
558 }
559
560 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
561                                        struct lfsck_component *com,
562                                        const struct lu_fid *fid,
563                                        bool accessed)
564 {
565         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
566         struct lfsck_rbtree_node        *lrn;
567         bool                             insert = false;
568         int                              idx;
569         int                              rc     = 0;
570         ENTRY;
571
572         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
573                 RETURN_EXIT;
574
575         if (!fid_is_idif(fid) && !fid_is_norm(fid))
576                 RETURN_EXIT;
577
578         read_lock(&llsd->llsd_rb_lock);
579         if (!llsd->llsd_rbtree_valid)
580                 GOTO(unlock, rc = 0);
581
582         lrn = lfsck_rbtree_search(llsd, fid, NULL);
583         if (lrn == NULL) {
584                 struct lfsck_rbtree_node *tmp;
585
586                 LASSERT(!insert);
587
588                 read_unlock(&llsd->llsd_rb_lock);
589                 tmp = lfsck_rbtree_new(env, fid);
590                 if (IS_ERR(tmp))
591                         GOTO(out, rc = PTR_ERR(tmp));
592
593                 insert = true;
594                 write_lock(&llsd->llsd_rb_lock);
595                 if (!llsd->llsd_rbtree_valid) {
596                         lfsck_rbtree_free(tmp);
597                         GOTO(unlock, rc = 0);
598                 }
599
600                 lrn = lfsck_rbtree_insert(llsd, tmp);
601                 if (lrn != tmp)
602                         lfsck_rbtree_free(tmp);
603         }
604
605         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
606         /* Any accessed object must be a known object. */
607         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
608                 atomic_inc(&lrn->lrn_known_count);
609         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
610                 atomic_inc(&lrn->lrn_accessed_count);
611
612         GOTO(unlock, rc = 0);
613
614 unlock:
615         if (insert)
616                 write_unlock(&llsd->llsd_rb_lock);
617         else
618                 read_unlock(&llsd->llsd_rb_lock);
619 out:
620         if (rc != 0 && accessed) {
621                 struct lfsck_layout *lo = com->lc_file_ram;
622
623                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
624                        "bitmap, and will cause incorrect LFSCK OST-object "
625                        "handling, so disable it to cancel orphan handling "
626                        "for related device. rc = %d\n",
627                        lfsck_lfsck2name(com->lc_lfsck), rc);
628
629                 lo->ll_flags |= LF_INCOMPLETE;
630                 lfsck_rbtree_cleanup(env, com);
631         }
632 }
633
634 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
635                                    const struct lfsck_layout *src)
636 {
637         int i;
638
639         des->ll_magic = le32_to_cpu(src->ll_magic);
640         des->ll_status = le32_to_cpu(src->ll_status);
641         des->ll_flags = le32_to_cpu(src->ll_flags);
642         des->ll_success_count = le32_to_cpu(src->ll_success_count);
643         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
644         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
645         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
646         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
647         des->ll_time_last_checkpoint =
648                                 le64_to_cpu(src->ll_time_last_checkpoint);
649         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
650         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
651         des->ll_pos_first_inconsistent =
652                         le64_to_cpu(src->ll_pos_first_inconsistent);
653         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
654         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
655         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
656         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
657         for (i = 0; i < LLIT_MAX; i++)
658                 des->ll_objs_repaired[i] =
659                                 le64_to_cpu(src->ll_objs_repaired[i]);
660         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
661 }
662
663 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
664                                    const struct lfsck_layout *src)
665 {
666         int i;
667
668         des->ll_magic = cpu_to_le32(src->ll_magic);
669         des->ll_status = cpu_to_le32(src->ll_status);
670         des->ll_flags = cpu_to_le32(src->ll_flags);
671         des->ll_success_count = cpu_to_le32(src->ll_success_count);
672         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
673         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
674         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
675         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
676         des->ll_time_last_checkpoint =
677                                 cpu_to_le64(src->ll_time_last_checkpoint);
678         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
679         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
680         des->ll_pos_first_inconsistent =
681                         cpu_to_le64(src->ll_pos_first_inconsistent);
682         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
683         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
684         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
685         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
686         for (i = 0; i < LLIT_MAX; i++)
687                 des->ll_objs_repaired[i] =
688                                 cpu_to_le64(src->ll_objs_repaired[i]);
689         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
690 }
691
692 /**
693  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
694  * \retval 0: succeed.
695  * \retval -ve: failed cases.
696  */
697 static int lfsck_layout_load(const struct lu_env *env,
698                              struct lfsck_component *com)
699 {
700         struct lfsck_layout             *lo     = com->lc_file_ram;
701         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
702         ssize_t                          size   = com->lc_file_size;
703         loff_t                           pos    = 0;
704         int                              rc;
705
706         rc = dbo->dbo_read(env, com->lc_obj,
707                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
708                            BYPASS_CAPA);
709         if (rc == 0) {
710                 return -ENOENT;
711         } else if (rc < 0) {
712                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
713                        lfsck_lfsck2name(com->lc_lfsck), rc);
714                 return rc;
715         } else if (rc != size) {
716                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
717                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
718                 return 1;
719         }
720
721         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
722         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
723                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
724                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
725                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
726                 return 1;
727         }
728
729         return 0;
730 }
731
732 static int lfsck_layout_store(const struct lu_env *env,
733                               struct lfsck_component *com)
734 {
735         struct dt_object         *obj           = com->lc_obj;
736         struct lfsck_instance    *lfsck         = com->lc_lfsck;
737         struct lfsck_layout      *lo            = com->lc_file_disk;
738         struct thandle           *handle;
739         ssize_t                   size          = com->lc_file_size;
740         loff_t                    pos           = 0;
741         int                       rc;
742         ENTRY;
743
744         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
745         handle = dt_trans_create(env, lfsck->li_bottom);
746         if (IS_ERR(handle))
747                 GOTO(log, rc = PTR_ERR(handle));
748
749         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
750                                      pos, handle);
751         if (rc != 0)
752                 GOTO(out, rc);
753
754         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
755         if (rc != 0)
756                 GOTO(out, rc);
757
758         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
759                              handle);
760
761         GOTO(out, rc);
762
763 out:
764         dt_trans_stop(env, lfsck->li_bottom, handle);
765
766 log:
767         if (rc != 0)
768                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
769                        lfsck_lfsck2name(lfsck), rc);
770         return rc;
771 }
772
773 static int lfsck_layout_init(const struct lu_env *env,
774                              struct lfsck_component *com)
775 {
776         struct lfsck_layout *lo = com->lc_file_ram;
777         int rc;
778
779         memset(lo, 0, com->lc_file_size);
780         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
781         lo->ll_status = LS_INIT;
782         down_write(&com->lc_sem);
783         rc = lfsck_layout_store(env, com);
784         up_write(&com->lc_sem);
785
786         return rc;
787 }
788
789 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
790                              struct dt_object *obj, const struct lu_fid *fid)
791 {
792         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
793         struct lu_seq_range      range  = { 0 };
794         struct lustre_mdt_attrs *lma;
795         int                      rc;
796
797         fld_range_set_any(&range);
798         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
799         if (rc == 0) {
800                 if (fld_range_is_ost(&range))
801                         return 1;
802
803                 return 0;
804         }
805
806         lma = &lfsck_env_info(env)->lti_lma;
807         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
808                           XATTR_NAME_LMA, BYPASS_CAPA);
809         if (rc == sizeof(*lma)) {
810                 lustre_lma_swab(lma);
811
812                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
813         }
814
815         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
816
817         return rc > 0;
818 }
819
820 static struct lfsck_layout_seq *
821 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
822 {
823         struct lfsck_layout_seq *lls;
824
825         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
826                 if (lls->lls_seq == seq)
827                         return lls;
828
829                 if (lls->lls_seq > seq)
830                         return NULL;
831         }
832
833         return NULL;
834 }
835
836 static void
837 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
838                         struct lfsck_layout_seq *lls)
839 {
840         struct lfsck_layout_seq *tmp;
841         struct list_head        *pos = &llsd->llsd_seq_list;
842
843         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
844                 if (lls->lls_seq < tmp->lls_seq) {
845                         pos = &tmp->lls_list;
846                         break;
847                 }
848         }
849         list_add_tail(&lls->lls_list, pos);
850 }
851
852 static int
853 lfsck_layout_lastid_create(const struct lu_env *env,
854                            struct lfsck_instance *lfsck,
855                            struct dt_object *obj)
856 {
857         struct lfsck_thread_info *info   = lfsck_env_info(env);
858         struct lu_attr           *la     = &info->lti_la;
859         struct dt_object_format  *dof    = &info->lti_dof;
860         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
861         struct dt_device         *dt     = lfsck->li_bottom;
862         struct thandle           *th;
863         __u64                     lastid = 0;
864         loff_t                    pos    = 0;
865         int                       rc;
866         ENTRY;
867
868         if (bk->lb_param & LPF_DRYRUN)
869                 return 0;
870
871         memset(la, 0, sizeof(*la));
872         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
873         la->la_valid = LA_MODE | LA_UID | LA_GID;
874         dof->dof_type = dt_mode_to_dft(S_IFREG);
875
876         th = dt_trans_create(env, dt);
877         if (IS_ERR(th))
878                 GOTO(log, rc = PTR_ERR(th));
879
880         rc = dt_declare_create(env, obj, la, NULL, dof, th);
881         if (rc != 0)
882                 GOTO(stop, rc);
883
884         rc = dt_declare_record_write(env, obj,
885                                      lfsck_buf_get(env, &lastid,
886                                                    sizeof(lastid)),
887                                      pos, th);
888         if (rc != 0)
889                 GOTO(stop, rc);
890
891         rc = dt_trans_start_local(env, dt, th);
892         if (rc != 0)
893                 GOTO(stop, rc);
894
895         dt_write_lock(env, obj, 0);
896         if (likely(dt_object_exists(obj) == 0)) {
897                 rc = dt_create(env, obj, la, NULL, dof, th);
898                 if (rc == 0)
899                         rc = dt_record_write(env, obj,
900                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
901                                 &pos, th);
902         }
903         dt_write_unlock(env, obj);
904
905         GOTO(stop, rc);
906
907 stop:
908         dt_trans_stop(env, dt, th);
909
910 log:
911         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
912                LPX64": rc = %d\n",
913                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
914
915         return rc;
916 }
917
918 static int
919 lfsck_layout_lastid_reload(const struct lu_env *env,
920                            struct lfsck_component *com,
921                            struct lfsck_layout_seq *lls)
922 {
923         __u64   lastid;
924         loff_t  pos     = 0;
925         int     rc;
926
927         dt_read_lock(env, lls->lls_lastid_obj, 0);
928         rc = dt_record_read(env, lls->lls_lastid_obj,
929                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
930         dt_read_unlock(env, lls->lls_lastid_obj);
931         if (unlikely(rc != 0))
932                 return rc;
933
934         lastid = le64_to_cpu(lastid);
935         if (lastid < lls->lls_lastid_known) {
936                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
937                 struct lfsck_layout     *lo     = com->lc_file_ram;
938
939                 lls->lls_lastid = lls->lls_lastid_known;
940                 lls->lls_dirty = 1;
941                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
942                         LASSERT(lfsck->li_out_notify != NULL);
943
944                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
945                                              LE_LASTID_REBUILDING);
946                         lo->ll_flags |= LF_CRASHED_LASTID;
947
948                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
949                                "LAST_ID file (1) for the sequence "LPX64
950                                ", old value "LPU64", known value "LPU64"\n",
951                                lfsck_lfsck2name(lfsck), lls->lls_seq,
952                                lastid, lls->lls_lastid);
953                 }
954         } else if (lastid >= lls->lls_lastid) {
955                 lls->lls_lastid = lastid;
956                 lls->lls_dirty = 0;
957         }
958
959         return 0;
960 }
961
962 static int
963 lfsck_layout_lastid_store(const struct lu_env *env,
964                           struct lfsck_component *com)
965 {
966         struct lfsck_instance           *lfsck  = com->lc_lfsck;
967         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
968         struct dt_device                *dt     = lfsck->li_bottom;
969         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
970         struct lfsck_layout_seq         *lls;
971         struct thandle                  *th;
972         __u64                            lastid;
973         int                              rc     = 0;
974         int                              rc1    = 0;
975
976         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
977                 loff_t pos = 0;
978
979                 if (!lls->lls_dirty)
980                         continue;
981
982                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
983                        "<seq> "LPX64" as <oid> "LPU64"\n",
984                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
985
986                 if (bk->lb_param & LPF_DRYRUN) {
987                         lls->lls_dirty = 0;
988                         continue;
989                 }
990
991                 th = dt_trans_create(env, dt);
992                 if (IS_ERR(th)) {
993                         rc1 = PTR_ERR(th);
994                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
995                                "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
996                                lfsck_lfsck2name(com->lc_lfsck),
997                                lls->lls_seq, rc1);
998                         continue;
999                 }
1000
1001                 lastid = cpu_to_le64(lls->lls_lastid);
1002                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1003                                              lfsck_buf_get(env, &lastid,
1004                                                            sizeof(lastid)),
1005                                              pos, th);
1006                 if (rc != 0)
1007                         goto stop;
1008
1009                 rc = dt_trans_start_local(env, dt, th);
1010                 if (rc != 0)
1011                         goto stop;
1012
1013                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1014                 rc = dt_record_write(env, lls->lls_lastid_obj,
1015                                      lfsck_buf_get(env, &lastid,
1016                                      sizeof(lastid)), &pos, th);
1017                 dt_write_unlock(env, lls->lls_lastid_obj);
1018                 if (rc == 0)
1019                         lls->lls_dirty = 0;
1020
1021 stop:
1022                 dt_trans_stop(env, dt, th);
1023                 if (rc != 0) {
1024                         rc1 = rc;
1025                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1026                                "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1027                                lfsck_lfsck2name(com->lc_lfsck),
1028                                lls->lls_seq, rc1);
1029                 }
1030         }
1031
1032         return rc1;
1033 }
1034
1035 static int
1036 lfsck_layout_lastid_load(const struct lu_env *env,
1037                          struct lfsck_component *com,
1038                          struct lfsck_layout_seq *lls)
1039 {
1040         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1041         struct lfsck_layout     *lo     = com->lc_file_ram;
1042         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1043         struct dt_object        *obj;
1044         loff_t                   pos    = 0;
1045         int                      rc;
1046         ENTRY;
1047
1048         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1049         obj = dt_locate(env, lfsck->li_bottom, fid);
1050         if (IS_ERR(obj))
1051                 RETURN(PTR_ERR(obj));
1052
1053         /* LAST_ID crashed, to be rebuilt */
1054         if (dt_object_exists(obj) == 0) {
1055                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1056                         LASSERT(lfsck->li_out_notify != NULL);
1057
1058                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1059                                              LE_LASTID_REBUILDING);
1060                         lo->ll_flags |= LF_CRASHED_LASTID;
1061
1062                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1063                                "LAST_ID file for sequence "LPX64"\n",
1064                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1065
1066                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1067                             cfs_fail_val > 0) {
1068                                 struct l_wait_info lwi = LWI_TIMEOUT(
1069                                                 cfs_time_seconds(cfs_fail_val),
1070                                                 NULL, NULL);
1071
1072                                 up_write(&com->lc_sem);
1073                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1074                                              !thread_is_running(&lfsck->li_thread),
1075                                              &lwi);
1076                                 down_write(&com->lc_sem);
1077                         }
1078                 }
1079
1080                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1081         } else {
1082                 dt_read_lock(env, obj, 0);
1083                 rc = dt_read(env, obj,
1084                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1085                         &pos);
1086                 dt_read_unlock(env, obj);
1087                 if (rc != 0 && rc != sizeof(__u64))
1088                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1089
1090                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1091                         LASSERT(lfsck->li_out_notify != NULL);
1092
1093                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1094                                              LE_LASTID_REBUILDING);
1095                         lo->ll_flags |= LF_CRASHED_LASTID;
1096
1097                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1098                                "LAST_ID file for the sequence "LPX64
1099                                ": rc = %d\n",
1100                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1101                 }
1102
1103                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1104                 rc = 0;
1105         }
1106
1107         GOTO(out, rc);
1108
1109 out:
1110         if (rc != 0)
1111                 lfsck_object_put(env, obj);
1112         else
1113                 lls->lls_lastid_obj = obj;
1114
1115         return rc;
1116 }
1117
1118 static void lfsck_layout_record_failure(const struct lu_env *env,
1119                                         struct lfsck_instance *lfsck,
1120                                         struct lfsck_layout *lo)
1121 {
1122         __u64 cookie;
1123
1124         lo->ll_objs_failed_phase1++;
1125         cookie = lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1126                                                         lfsck->li_di_oit);
1127         if (lo->ll_pos_first_inconsistent == 0 ||
1128             lo->ll_pos_first_inconsistent < cookie) {
1129                 lo->ll_pos_first_inconsistent = cookie;
1130
1131                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1132                        "inconsistency at the pos ["LPU64"]\n",
1133                        lfsck_lfsck2name(lfsck),
1134                        lo->ll_pos_first_inconsistent);
1135         }
1136 }
1137
1138 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1139                                            struct lfsck_component *com,
1140                                            int rc)
1141 {
1142         struct lfsck_instance   *lfsck = com->lc_lfsck;
1143         struct lfsck_layout     *lo    = com->lc_file_ram;
1144
1145         down_write(&com->lc_sem);
1146         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1147                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1148         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1149         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1150
1151         if (rc > 0) {
1152                 com->lc_journal = 0;
1153                 if (lo->ll_flags & LF_INCOMPLETE)
1154                         lo->ll_status = LS_PARTIAL;
1155                 else
1156                         lo->ll_status = LS_COMPLETED;
1157                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
1158                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1159                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1160                 lo->ll_success_count++;
1161         } else if (rc == 0) {
1162                 lo->ll_status = lfsck->li_status;
1163                 if (lo->ll_status == 0)
1164                         lo->ll_status = LS_STOPPED;
1165         } else {
1166                 lo->ll_status = LS_FAILED;
1167         }
1168
1169         rc = lfsck_layout_store(env, com);
1170         up_write(&com->lc_sem);
1171
1172         return rc;
1173 }
1174
1175 static int lfsck_layout_trans_stop(const struct lu_env *env,
1176                                    struct dt_device *dev,
1177                                    struct thandle *handle, int result)
1178 {
1179         int rc;
1180
1181         handle->th_result = result;
1182         rc = dt_trans_stop(env, dev, handle);
1183         if (rc > 0)
1184                 rc = 0;
1185         else if (rc == 0)
1186                 rc = 1;
1187
1188         return rc;
1189 }
1190
1191 /**
1192  * Get the system default stripe size.
1193  *
1194  * \param[in] env       pointer to the thread context
1195  * \param[in] lfsck     pointer to the lfsck instance
1196  * \param[out] size     pointer to the default stripe size
1197  *
1198  * \retval              0 for success
1199  * \retval              negative error number on failure
1200  */
1201 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1202                                            struct lfsck_instance *lfsck,
1203                                            __u32 *size)
1204 {
1205         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1206         struct dt_object        *root;
1207         int                      rc;
1208
1209         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1210         if (IS_ERR(root))
1211                 return PTR_ERR(root);
1212
1213         /* Get the default stripe size via xattr_get on the backend root. */
1214         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1215                           XATTR_NAME_LOV, BYPASS_CAPA);
1216         if (rc > 0) {
1217                 /* The lum->lmm_stripe_size is LE mode. The *size also
1218                  * should be LE mode. So it is unnecessary to convert. */
1219                 *size = lum->lmm_stripe_size;
1220                 rc = 0;
1221         } else if (unlikely(rc == 0)) {
1222                 rc = -EINVAL;
1223         }
1224
1225         lfsck_object_put(env, root);
1226
1227         return rc;
1228 }
1229
1230 /**
1231  * \retval       +1: repaired
1232  * \retval        0: did nothing
1233  * \retval      -ve: on error
1234  */
1235 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1236                                      struct thandle *handle,
1237                                      struct dt_object *parent,
1238                                      struct lu_fid *cfid,
1239                                      struct lu_buf *buf,
1240                                      struct lov_ost_data_v1 *slot,
1241                                      int fl, __u32 ost_idx)
1242 {
1243         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1244         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1245         struct lu_buf            ea_buf;
1246         int                      rc;
1247         __u32                    magic;
1248         __u16                    count;
1249
1250         magic = le32_to_cpu(lmm->lmm_magic);
1251         count = le16_to_cpu(lmm->lmm_stripe_count);
1252
1253         fid_to_ostid(cfid, oi);
1254         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1255         slot->l_ost_gen = cpu_to_le32(0);
1256         slot->l_ost_idx = cpu_to_le32(ost_idx);
1257
1258         if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1259                 struct lov_ost_data_v1 *objs;
1260                 int                     i;
1261
1262                 if (magic == LOV_MAGIC_V1)
1263                         objs = &lmm->lmm_objects[0];
1264                 else
1265                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1266                 for (i = 0; i < count; i++, objs++) {
1267                         if (objs != slot && lovea_slot_is_dummy(objs))
1268                                 break;
1269                 }
1270
1271                 /* If the @slot is the last dummy slot to be refilled,
1272                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1273                 if (i == count)
1274                         lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1275         }
1276
1277         lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1278         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1279                           BYPASS_CAPA);
1280         if (rc == 0)
1281                 rc = 1;
1282
1283         return rc;
1284 }
1285
1286 /**
1287  * \retval       +1: repaired
1288  * \retval        0: did nothing
1289  * \retval      -ve: on error
1290  */
1291 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1292                                      struct lfsck_instance *lfsck,
1293                                      struct thandle *handle,
1294                                      struct dt_object *parent,
1295                                      struct lu_fid *cfid,
1296                                      struct lu_buf *buf, int fl,
1297                                      __u32 ost_idx, __u32 ea_off, bool reset)
1298 {
1299         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1300         struct lov_ost_data_v1  *objs;
1301         int                      rc;
1302         __u16                    count;
1303         bool                     hole   = false;
1304         ENTRY;
1305
1306         if (fl == LU_XATTR_CREATE || reset) {
1307                 __u32 pattern = LOV_PATTERN_RAID0;
1308
1309                 count = ea_off + 1;
1310                 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1311
1312                 if (ea_off != 0 || reset) {
1313                         pattern |= LOV_PATTERN_F_HOLE;
1314                         hole = true;
1315                 }
1316
1317                 memset(lmm, 0, buf->lb_len);
1318                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1319                 lmm->lmm_pattern = cpu_to_le32(pattern);
1320                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1321                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1322
1323                 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1324                                                      &lmm->lmm_stripe_size);
1325                 if (rc != 0)
1326                         RETURN(rc);
1327
1328                 objs = &lmm->lmm_objects[ea_off];
1329         } else {
1330                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1331                 int     gap;
1332
1333                 count = le16_to_cpu(lmm->lmm_stripe_count);
1334                 if (magic == LOV_MAGIC_V1)
1335                         objs = &lmm->lmm_objects[count];
1336                 else
1337                         objs = &((struct lov_mds_md_v3 *)lmm)->
1338                                                         lmm_objects[count];
1339
1340                 gap = ea_off - count;
1341                 if (gap >= 0)
1342                         count = ea_off + 1;
1343                 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1344
1345                 if (gap > 0) {
1346                         memset(objs, 0, gap * sizeof(*objs));
1347                         lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1348                         hole = true;
1349                 }
1350
1351                 lmm->lmm_layout_gen =
1352                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1353                 objs += gap;
1354         }
1355
1356         lmm->lmm_stripe_count = cpu_to_le16(count);
1357         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1358                                        fl, ost_idx);
1359
1360         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1361                DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1362                "reset %s, %s LOV EA hole: rc = %d\n",
1363                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1364                ost_idx, ea_off, fl, reset ? "yes" : "no",
1365                hole ? "with" : "without", rc);
1366
1367         RETURN(rc);
1368 }
1369
1370 /**
1371  * \retval       +1: repaired
1372  * \retval        0: did nothing
1373  * \retval      -ve: on error
1374  */
1375 static int lfsck_layout_update_pfid(const struct lu_env *env,
1376                                     struct lfsck_component *com,
1377                                     struct dt_object *parent,
1378                                     struct lu_fid *cfid,
1379                                     struct dt_device *cdev, __u32 ea_off)
1380 {
1381         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1382         struct dt_object        *child;
1383         struct thandle          *handle;
1384         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1385         struct lu_buf           *buf;
1386         int                      rc     = 0;
1387         ENTRY;
1388
1389         child = lfsck_object_find_by_dev(env, cdev, cfid);
1390         if (IS_ERR(child))
1391                 RETURN(PTR_ERR(child));
1392
1393         handle = dt_trans_create(env, cdev);
1394         if (IS_ERR(handle))
1395                 GOTO(out, rc = PTR_ERR(handle));
1396
1397         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1398         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1399         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1400          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1401          * parent MDT-object's layout EA. */
1402         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1403         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1404
1405         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1406         if (rc != 0)
1407                 GOTO(stop, rc);
1408
1409         rc = dt_trans_start(env, cdev, handle);
1410         if (rc != 0)
1411                 GOTO(stop, rc);
1412
1413         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1414                           BYPASS_CAPA);
1415
1416         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1417
1418 stop:
1419         dt_trans_stop(env, cdev, handle);
1420
1421 out:
1422         lu_object_put(env, &child->do_lu);
1423
1424         return rc;
1425 }
1426
1427 /**
1428  * This function will create the MDT-object with the given (partial) LOV EA.
1429  *
1430  * Under some data corruption cases, the MDT-object of the file may be lost,
1431  * but its OST-objects, or some of them are there. The layout LFSCK needs to
1432  * re-create the MDT-object with the orphan OST-object(s) information.
1433  *
1434  * On the other hand, the LFSCK may has created some OST-object for repairing
1435  * dangling LOV EA reference, but as the LFSCK processing, it may find that
1436  * the old OST-object is there and should replace the former new created OST
1437  * object. Unfortunately, some others have modified such newly created object.
1438  * To keep the data (both new and old), the LFSCK will create MDT-object with
1439  * new FID to reference the original OST-object.
1440  *
1441  * \param[in] env       pointer to the thread context
1442  * \param[in] com       pointer to the lfsck component
1443  * \param[in] ltd       pointer to target device descriptor
1444  * \param[in] rec       pointer to the record for the orphan OST-object
1445  * \param[in] cfid      pointer to FID for the orphan OST-object
1446  * \param[in] infix     additional information, such as the FID for original
1447  *                      MDT-object and the stripe offset in the LOV EA
1448  * \param[in] type      the type for describing why the orphan MDT-object is
1449  *                      created. The rules are as following:
1450  *
1451  *  type "C":           Multiple OST-objects claim the same MDT-object and the
1452  *                      same slot in the layout EA. Then the LFSCK will create
1453  *                      new MDT-object(s) to hold the conflict OST-object(s).
1454  *
1455  *  type "N":           The orphan OST-object does not know which one was the
1456  *                      real parent MDT-object, so the LFSCK uses new FID for
1457  *                      its parent MDT-object.
1458  *
1459  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
1460  *                      but does not know the position (the file name) in the
1461  *                      layout.
1462  *
1463  * The orphan name will be like:
1464  * ${FID}-${infix}-${type}-${conflict_version}
1465  *
1466  * \param[in] ea_off    the stripe offset in the LOV EA
1467  *
1468  * \retval              positive on repaired something
1469  * \retval              0 if needs to repair nothing
1470  * \retval              negative error number on failure
1471  */
1472 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1473                                         struct lfsck_component *com,
1474                                         struct lfsck_tgt_desc *ltd,
1475                                         struct lu_orphan_rec *rec,
1476                                         struct lu_fid *cfid,
1477                                         const char *infix,
1478                                         const char *type,
1479                                         __u32 ea_off)
1480 {
1481         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1482         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
1483         char                            *name   = info->lti_key;
1484         struct lu_attr                  *la     = &info->lti_la;
1485         struct dt_object_format         *dof    = &info->lti_dof;
1486         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1487         struct lu_fid                   *pfid   = &rec->lor_fid;
1488         struct lu_fid                   *tfid   = &info->lti_fid3;
1489         struct dt_device                *next   = lfsck->li_next;
1490         struct dt_object                *pobj   = NULL;
1491         struct dt_object                *cobj   = NULL;
1492         struct thandle                  *th     = NULL;
1493         struct lu_buf                    pbuf   = { 0 };
1494         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1495         struct lu_buf                    lov_buf;
1496         struct lustre_handle             lh     = { 0 };
1497         struct linkea_data               ldata  = { 0 };
1498         struct lu_buf                    linkea_buf;
1499         const struct lu_name            *pname;
1500         int                              size   = 0;
1501         int                              idx    = 0;
1502         int                              rc     = 0;
1503         ENTRY;
1504
1505         /* Create .lustre/lost+found/MDTxxxx when needed. */
1506         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1507                 rc = lfsck_create_lpf(env, lfsck);
1508                 if (rc != 0)
1509                         GOTO(log, rc);
1510         }
1511
1512         if (fid_is_zero(pfid)) {
1513                 struct filter_fid *ff = &info->lti_new_pfid;
1514
1515                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
1516                 if (rc != 0)
1517                         RETURN(rc);
1518
1519                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
1520                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
1521                 /* Currently, the filter_fid::ff_parent::f_ver is not the
1522                  * real parent MDT-object's FID::f_ver, instead it is the
1523                  * OST-object index in its parent MDT-object's layout EA. */
1524                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1525                 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
1526                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
1527                 if (IS_ERR(cobj))
1528                         GOTO(log, rc = PTR_ERR(cobj));
1529         }
1530
1531         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
1532         if (IS_ERR(pobj))
1533                 GOTO(put, rc = PTR_ERR(pobj));
1534
1535         LASSERT(infix != NULL);
1536         LASSERT(type != NULL);
1537
1538         do {
1539                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
1540                          type, idx++);
1541                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
1542                                (const struct dt_key *)name, BYPASS_CAPA);
1543                 if (rc != 0 && rc != -ENOENT)
1544                         GOTO(put, rc);
1545         } while (rc == 0);
1546
1547         rc = linkea_data_new(&ldata,
1548                              &lfsck_env_info(env)->lti_linkea_buf);
1549         if (rc != 0)
1550                 GOTO(put, rc);
1551
1552         pname = lfsck_name_get_const(env, name, strlen(name));
1553         rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
1554         if (rc != 0)
1555                 GOTO(put, rc);
1556
1557         memset(la, 0, sizeof(*la));
1558         la->la_uid = rec->lor_uid;
1559         la->la_gid = rec->lor_gid;
1560         la->la_mode = S_IFREG | S_IRUSR;
1561         la->la_valid = LA_MODE | LA_UID | LA_GID;
1562
1563         memset(dof, 0, sizeof(*dof));
1564         dof->dof_type = dt_mode_to_dft(S_IFREG);
1565
1566         size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
1567         if (ea_buf->lb_len < size) {
1568                 lu_buf_realloc(ea_buf, size);
1569                 if (ea_buf->lb_buf == NULL)
1570                         GOTO(put, rc = -ENOMEM);
1571         }
1572
1573         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
1574          *
1575          * XXX: Currently, we do not grab the PDO lock as normal create cases,
1576          *      because creating MDT-object for orphan OST-object is rare, we
1577          *      do not much care about the performance. It can be improved in
1578          *      the future when needed. */
1579         rc = lfsck_ibits_lock(env, lfsck, lfsck->li_lpf_obj, &lh,
1580                               MDS_INODELOCK_UPDATE, LCK_EX);
1581         if (rc != 0)
1582                 GOTO(put, rc);
1583
1584         th = dt_trans_create(env, next);
1585         if (IS_ERR(th))
1586                 GOTO(unlock, rc = PTR_ERR(th));
1587
1588         /* 1a. Update OST-object's parent information remotely.
1589          *
1590          * If other subsequent modifications failed, then next LFSCK scanning
1591          * will process the OST-object as orphan again with known parent FID. */
1592         if (cobj != NULL) {
1593                 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
1594                                           0, th);
1595                 if (rc != 0)
1596                         GOTO(stop, rc);
1597         }
1598
1599         /* 2a. Create the MDT-object locally. */
1600         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
1601         if (rc != 0)
1602                 GOTO(stop, rc);
1603
1604         /* 3a. Add layout EA for the MDT-object. */
1605         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
1606         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
1607                                   LU_XATTR_CREATE, th);
1608         if (rc != 0)
1609                 GOTO(stop, rc);
1610
1611         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1612         dtrec->rec_fid = pfid;
1613         dtrec->rec_type = S_IFREG;
1614         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
1615                                (const struct dt_rec *)dtrec,
1616                                (const struct dt_key *)name, th);
1617         if (rc != 0)
1618                 GOTO(stop, rc);
1619
1620         /* 5a. insert linkEA for parent. */
1621         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1622                        ldata.ld_leh->leh_len);
1623         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
1624                                   XATTR_NAME_LINK, 0, th);
1625         if (rc != 0)
1626                 GOTO(stop, rc);
1627
1628         rc = dt_trans_start(env, next, th);
1629         if (rc != 0)
1630                 GOTO(stop, rc);
1631
1632         /* 1b. Update OST-object's parent information remotely. */
1633         if (cobj != NULL) {
1634                 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
1635                                   BYPASS_CAPA);
1636                 if (rc != 0)
1637                         GOTO(stop, rc);
1638         }
1639
1640         dt_write_lock(env, pobj, 0);
1641         /* 2b. Create the MDT-object locally. */
1642         rc = dt_create(env, pobj, la, NULL, dof, th);
1643         if (rc == 0)
1644                 /* 3b. Add layout EA for the MDT-object. */
1645                 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
1646                                                &lov_buf, LU_XATTR_CREATE,
1647                                                ltd->ltd_index, ea_off, false);
1648         dt_write_unlock(env, pobj);
1649         if (rc < 0)
1650                 GOTO(stop, rc);
1651
1652         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1653         rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
1654                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
1655         if (rc != 0)
1656                 GOTO(stop, rc);
1657
1658         /* 5b. insert linkEA for parent. */
1659         rc = dt_xattr_set(env, pobj, &linkea_buf,
1660                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
1661
1662         GOTO(stop, rc);
1663
1664 stop:
1665         dt_trans_stop(env, next, th);
1666
1667 unlock:
1668         lfsck_ibits_unlock(&lh, LCK_EX);
1669
1670 put:
1671         if (cobj != NULL && !IS_ERR(cobj))
1672                 lu_object_put(env, &cobj->do_lu);
1673         if (pobj != NULL && !IS_ERR(pobj))
1674                 lu_object_put(env, &pobj->do_lu);
1675
1676 log:
1677         if (rc < 0)
1678                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
1679                        "recreate the lost MDT-object: parent "DFID
1680                        ", child "DFID", OST-index %u, stripe-index %u, "
1681                        "infix %s, type %s: rc = %d\n",
1682                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
1683                        ltd->ltd_index, ea_off, infix, type, rc);
1684
1685         return rc >= 0 ? 1 : rc;
1686 }
1687
1688 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
1689                                                    struct lfsck_component *com,
1690                                                    const struct lu_fid *fid,
1691                                                    __u32 index)
1692 {
1693         struct lfsck_thread_info *info  = lfsck_env_info(env);
1694         struct lfsck_request     *lr    = &info->lti_lr;
1695         struct lfsck_instance    *lfsck = com->lc_lfsck;
1696         struct lfsck_tgt_desc    *ltd;
1697         struct ptlrpc_request    *req;
1698         struct lfsck_request     *tmp;
1699         struct obd_export        *exp;
1700         int                       rc    = 0;
1701         ENTRY;
1702
1703         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
1704         if (unlikely(ltd == NULL))
1705                 RETURN(-ENXIO);
1706
1707         exp = ltd->ltd_exp;
1708         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
1709                 GOTO(put, rc = -EOPNOTSUPP);
1710
1711         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
1712         if (req == NULL)
1713                 GOTO(put, rc = -ENOMEM);
1714
1715         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
1716         if (rc != 0) {
1717                 ptlrpc_request_free(req);
1718
1719                 GOTO(put, rc);
1720         }
1721
1722         memset(lr, 0, sizeof(*lr));
1723         lr->lr_event = LE_CONDITIONAL_DESTROY;
1724         lr->lr_active = LFSCK_TYPE_LAYOUT;
1725         lr->lr_fid = *fid;
1726
1727         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1728         *tmp = *lr;
1729         ptlrpc_request_set_replen(req);
1730
1731         rc = ptlrpc_queue_wait(req);
1732         ptlrpc_req_finished(req);
1733
1734         GOTO(put, rc);
1735
1736 put:
1737         lfsck_tgt_put(ltd);
1738
1739         return rc;
1740 }
1741
1742 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
1743                                                   struct lfsck_component *com,
1744                                                   struct lfsck_request *lr)
1745 {
1746         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1747         struct lu_attr                  *la     = &info->lti_la;
1748         ldlm_policy_data_t              *policy = &info->lti_policy;
1749         struct ldlm_res_id              *resid  = &info->lti_resid;
1750         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1751         struct dt_device                *dev    = lfsck->li_bottom;
1752         struct lu_fid                   *fid    = &lr->lr_fid;
1753         struct dt_object                *obj;
1754         struct thandle                  *th     = NULL;
1755         struct lustre_handle             lh     = { 0 };
1756         __u64                            flags  = 0;
1757         int                              rc     = 0;
1758         ENTRY;
1759
1760         obj = lfsck_object_find_by_dev(env, dev, fid);
1761         if (IS_ERR(obj))
1762                 RETURN(PTR_ERR(obj));
1763
1764         dt_read_lock(env, obj, 0);
1765         if (dt_object_exists(obj) == 0 ||
1766             lfsck_is_dead_obj(obj)) {
1767                 dt_read_unlock(env, obj);
1768
1769                 GOTO(put, rc = -ENOENT);
1770         }
1771
1772         /* Get obj's attr without lock firstly. */
1773         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1774         dt_read_unlock(env, obj);
1775         if (rc != 0)
1776                 GOTO(put, rc);
1777
1778         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
1779                 GOTO(put, rc = -ETXTBSY);
1780
1781         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
1782         LASSERT(lfsck->li_namespace != NULL);
1783
1784         memset(policy, 0, sizeof(*policy));
1785         policy->l_extent.end = OBD_OBJECT_EOF;
1786         ost_fid_build_resid(fid, resid);
1787         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
1788                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1789                                     ldlm_completion_ast, NULL, NULL, 0,
1790                                     LVB_T_NONE, NULL, &lh);
1791         if (rc != ELDLM_OK)
1792                 GOTO(put, rc = -EIO);
1793
1794         dt_write_lock(env, obj, 0);
1795         /* Get obj's attr within lock again. */
1796         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1797         if (rc != 0)
1798                 GOTO(unlock, rc);
1799
1800         if (la->la_ctime != 0)
1801                 GOTO(unlock, rc = -ETXTBSY);
1802
1803         th = dt_trans_create(env, dev);
1804         if (IS_ERR(th))
1805                 GOTO(unlock, rc = PTR_ERR(th));
1806
1807         rc = dt_declare_ref_del(env, obj, th);
1808         if (rc != 0)
1809                 GOTO(stop, rc);
1810
1811         rc = dt_declare_destroy(env, obj, th);
1812         if (rc != 0)
1813                 GOTO(stop, rc);
1814
1815         rc = dt_trans_start_local(env, dev, th);
1816         if (rc != 0)
1817                 GOTO(stop, rc);
1818
1819         rc = dt_ref_del(env, obj, th);
1820         if (rc != 0)
1821                 GOTO(stop, rc);
1822
1823         rc = dt_destroy(env, obj, th);
1824         if (rc == 0)
1825                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
1826                        "OST-object "DFID" that was created for reparing "
1827                        "dangling referenced case. But the original missed "
1828                        "OST-object is found now.\n",
1829                        lfsck_lfsck2name(lfsck), PFID(fid));
1830
1831         GOTO(stop, rc);
1832
1833 stop:
1834         dt_trans_stop(env, dev, th);
1835
1836 unlock:
1837         dt_write_unlock(env, obj);
1838         ldlm_lock_decref(&lh, LCK_EX);
1839
1840 put:
1841         lu_object_put(env, &obj->do_lu);
1842
1843         return rc;
1844 }
1845
1846 /**
1847  * Some OST-object has occupied the specified layout EA slot.
1848  * Such OST-object may be generated by the LFSCK when repair
1849  * dangling referenced MDT-object, which can be indicated by
1850  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
1851  * is true and such OST-object has not been modified yet, we
1852  * will replace it with the orphan OST-object; otherwise the
1853  * LFSCK will create new MDT-object to reference the orphan.
1854  *
1855  * \retval       +1: repaired
1856  * \retval        0: did nothing
1857  * \retval      -ve: on error
1858  */
1859 static int lfsck_layout_conflict_create(const struct lu_env *env,
1860                                         struct lfsck_component *com,
1861                                         struct lfsck_tgt_desc *ltd,
1862                                         struct lu_orphan_rec *rec,
1863                                         struct dt_object *parent,
1864                                         struct lu_fid *cfid,
1865                                         struct lu_buf *ea_buf,
1866                                         struct lov_ost_data_v1 *slot,
1867                                         __u32 ea_off)
1868 {
1869         struct lfsck_thread_info *info          = lfsck_env_info(env);
1870         struct lu_fid            *cfid2         = &info->lti_fid2;
1871         struct ost_id            *oi            = &info->lti_oi;
1872         char                     *infix         = info->lti_tmpbuf;
1873         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
1874         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
1875         struct thandle           *th            = NULL;
1876         struct lustre_handle      lh            = { 0 };
1877         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
1878         int                       rc            = 0;
1879         ENTRY;
1880
1881         ostid_le_to_cpu(&slot->l_ost_oi, oi);
1882         rc = ostid_to_fid(cfid2, oi, ost_idx2);
1883         if (rc != 0)
1884                 GOTO(out, rc);
1885
1886         /* Hold layout lock on the parent to prevent others to access. */
1887         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
1888                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
1889                               LCK_EX);
1890         if (rc != 0)
1891                 GOTO(out, rc);
1892
1893         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
1894
1895         /* If the conflict OST-obejct is not created for fixing dangling
1896          * referenced MDT-object in former LFSCK check/repair, or it has
1897          * been modified by others, then we cannot destroy it. Re-create
1898          * a new MDT-object for the orphan OST-object. */
1899         if (rc == -ETXTBSY) {
1900                 /* No need the layout lock on the original parent. */
1901                 lfsck_ibits_unlock(&lh, LCK_EX);
1902
1903                 fid_zero(&rec->lor_fid);
1904                 snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
1905                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
1906                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
1907                                                   infix, "C", ea_off);
1908
1909                 RETURN(rc);
1910         }
1911
1912         if (rc != 0 && rc != -ENOENT)
1913                 GOTO(unlock, rc);
1914
1915         th = dt_trans_create(env, dev);
1916         if (IS_ERR(th))
1917                 GOTO(unlock, rc = PTR_ERR(th));
1918
1919         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
1920                                   LU_XATTR_REPLACE, th);
1921         if (rc != 0)
1922                 GOTO(stop, rc);
1923
1924         rc = dt_trans_start_local(env, dev, th);
1925         if (rc != 0)
1926                 GOTO(stop, rc);
1927
1928         dt_write_lock(env, parent, 0);
1929         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1930         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
1931                                        LU_XATTR_REPLACE, ltd->ltd_index);
1932         dt_write_unlock(env, parent);
1933
1934         GOTO(stop, rc);
1935
1936 stop:
1937         dt_trans_stop(env, dev, th);
1938
1939 unlock:
1940         lfsck_ibits_unlock(&lh, LCK_EX);
1941
1942 out:
1943         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
1944                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
1945                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
1946                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
1947                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
1948                ea_off, rc);
1949
1950         return rc >= 0 ? 1 : rc;
1951 }
1952
1953 /**
1954  * \retval       +1: repaired
1955  * \retval        0: did nothing
1956  * \retval      -ve: on error
1957  */
1958 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
1959                                        struct lfsck_component *com,
1960                                        struct lfsck_tgt_desc *ltd,
1961                                        struct lu_orphan_rec *rec,
1962                                        struct dt_object *parent,
1963                                        struct lu_fid *cfid,
1964                                        __u32 ost_idx, __u32 ea_off)
1965 {
1966         struct lfsck_thread_info *info          = lfsck_env_info(env);
1967         struct lu_buf            *buf           = &info->lti_big_buf;
1968         struct lu_fid            *fid           = &info->lti_fid2;
1969         struct ost_id            *oi            = &info->lti_oi;
1970         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1971         struct dt_device         *dt            = lfsck->li_bottom;
1972         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
1973         struct thandle            *handle       = NULL;
1974         size_t                    lovea_size;
1975         struct lov_mds_md_v1     *lmm;
1976         struct lov_ost_data_v1   *objs;
1977         struct lustre_handle      lh            = { 0 };
1978         __u32                     magic;
1979         int                       fl            = 0;
1980         int                       rc            = 0;
1981         int                       rc1;
1982         int                       i;
1983         __u16                     count;
1984         bool                      locked        = false;
1985         ENTRY;
1986
1987         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1988                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
1989                               LCK_EX);
1990         if (rc != 0) {
1991                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
1992                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
1993                        "stripe-index %u: rc = %d\n",
1994                        lfsck_lfsck2name(lfsck), PFID(cfid),
1995                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
1996
1997                 RETURN(rc);
1998         }
1999
2000 again:
2001         if (locked) {
2002                 dt_write_unlock(env, parent);
2003                 locked = false;
2004         }
2005
2006         if (handle != NULL) {
2007                 dt_trans_stop(env, dt, handle);
2008                 handle = NULL;
2009         }
2010
2011         if (rc < 0)
2012                 GOTO(unlock_layout, rc);
2013
2014         lovea_size = rc;
2015         if (buf->lb_len < lovea_size) {
2016                 lu_buf_realloc(buf, lovea_size);
2017                 if (buf->lb_buf == NULL)
2018                         GOTO(unlock_layout, rc = -ENOMEM);
2019         }
2020
2021         if (!(bk->lb_param & LPF_DRYRUN)) {
2022                 handle = dt_trans_create(env, dt);
2023                 if (IS_ERR(handle))
2024                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2025
2026                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2027                                           fl, handle);
2028                 if (rc != 0)
2029                         GOTO(stop, rc);
2030
2031                 rc = dt_trans_start_local(env, dt, handle);
2032                 if (rc != 0)
2033                         GOTO(stop, rc);
2034         }
2035
2036         dt_write_lock(env, parent, 0);
2037         locked = true;
2038         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2039         if (rc == -ERANGE) {
2040                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2041                                   BYPASS_CAPA);
2042                 LASSERT(rc != 0);
2043                 goto again;
2044         } else if (rc == -ENODATA || rc == 0) {
2045                 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2046                 /* If the declared is not big enough, re-try. */
2047                 if (buf->lb_len < lovea_size) {
2048                         rc = lovea_size;
2049                         goto again;
2050                 }
2051                 fl = LU_XATTR_CREATE;
2052         } else if (rc < 0) {
2053                 GOTO(unlock_parent, rc);
2054         } else if (unlikely(buf->lb_len == 0)) {
2055                 goto again;
2056         } else {
2057                 fl = LU_XATTR_REPLACE;
2058                 lovea_size = rc;
2059         }
2060
2061         if (fl == LU_XATTR_CREATE) {
2062                 if (bk->lb_param & LPF_DRYRUN)
2063                         GOTO(unlock_parent, rc = 1);
2064
2065                 LASSERT(buf->lb_len >= lovea_size);
2066
2067                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2068                                                buf, fl, ost_idx, ea_off, false);
2069
2070                 GOTO(unlock_parent, rc);
2071         }
2072
2073         lmm = buf->lb_buf;
2074         rc1 = lfsck_layout_verify_header(lmm);
2075
2076         /* If the LOV EA crashed, the rebuild it. */
2077         if (rc1 == -EINVAL) {
2078                 if (bk->lb_param & LPF_DRYRUN)
2079                         GOTO(unlock_parent, rc = 1);
2080
2081                 LASSERT(buf->lb_len >= lovea_size);
2082
2083                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2084                                                buf, fl, ost_idx, ea_off, true);
2085
2086                 GOTO(unlock_parent, rc);
2087         }
2088
2089         /* For other unknown magic/pattern, keep the current LOV EA. */
2090         if (rc1 != 0)
2091                 GOTO(unlock_parent, rc = rc1);
2092
2093         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2094          * been verified in lfsck_layout_verify_header() already. If some
2095          * new magic introduced in the future, then layout LFSCK needs to
2096          * be updated also. */
2097         magic = le32_to_cpu(lmm->lmm_magic);
2098         if (magic == LOV_MAGIC_V1) {
2099                 objs = &lmm->lmm_objects[0];
2100         } else {
2101                 LASSERT(magic == LOV_MAGIC_V3);
2102                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2103         }
2104
2105         count = le16_to_cpu(lmm->lmm_stripe_count);
2106         if (count == 0)
2107                 GOTO(unlock_parent, rc = -EINVAL);
2108         LASSERT(count > 0);
2109
2110         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2111         if (count <= ea_off) {
2112                 if (bk->lb_param & LPF_DRYRUN)
2113                         GOTO(unlock_parent, rc = 1);
2114
2115                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2116                 /* If the declared is not big enough, re-try. */
2117                 if (buf->lb_len < lovea_size) {
2118                         rc = lovea_size;
2119                         goto again;
2120                 }
2121
2122                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2123                                                buf, fl, ost_idx, ea_off, false);
2124
2125                 GOTO(unlock_parent, rc);
2126         }
2127
2128         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2129
2130         for (i = 0; i < count; i++, objs++) {
2131                 /* The MDT-object was created via lfsck_layout_recover_create()
2132                  * by others before, and we fill the dummy layout EA. */
2133                 if (lovea_slot_is_dummy(objs)) {
2134                         if (i != ea_off)
2135                                 continue;
2136
2137                         if (bk->lb_param & LPF_DRYRUN)
2138                                 GOTO(unlock_parent, rc = 1);
2139
2140                         lmm->lmm_layout_gen =
2141                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2142                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2143                                                        cfid, buf, objs, fl,
2144                                                        ost_idx);
2145
2146                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2147                                "dummy layout slot for "DFID": parent "DFID
2148                                ", OST-index %u, stripe-index %u: rc = %d\n",
2149                                lfsck_lfsck2name(lfsck), PFID(cfid),
2150                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2151
2152                         GOTO(unlock_parent, rc);
2153                 }
2154
2155                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2156                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2157                 if (rc != 0) {
2158                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2159                                "invalid layout EA at the slot %d, index %u\n",
2160                                lfsck_lfsck2name(lfsck),
2161                                PFID(lfsck_dto2fid(parent)), i,
2162                                le32_to_cpu(objs->l_ost_idx));
2163
2164                         GOTO(unlock_parent, rc);
2165                 }
2166
2167                 /* It should be rare case, the slot is there, but the LFSCK
2168                  * does not handle it during the first-phase cycle scanning. */
2169                 if (unlikely(lu_fid_eq(fid, cfid))) {
2170                         if (i == ea_off) {
2171                                 GOTO(unlock_parent, rc = 0);
2172                         } else {
2173                                 /* Rare case that the OST-object index
2174                                  * does not match the parent MDT-object
2175                                  * layout EA. We trust the later one. */
2176                                 if (bk->lb_param & LPF_DRYRUN)
2177                                         GOTO(unlock_parent, rc = 1);
2178
2179                                 dt_write_unlock(env, parent);
2180                                 if (handle != NULL)
2181                                         dt_trans_stop(env, dt, handle);
2182                                 lfsck_ibits_unlock(&lh, LCK_EX);
2183                                 rc = lfsck_layout_update_pfid(env, com, parent,
2184                                                         cfid, ltd->ltd_tgt, i);
2185
2186                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2187                                        "updated OST-object's pfid for "DFID
2188                                        ": parent "DFID", OST-index %u, "
2189                                        "stripe-index %u: rc = %d\n",
2190                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2191                                        PFID(lfsck_dto2fid(parent)),
2192                                        ltd->ltd_index, i, rc);
2193
2194                                 RETURN(rc);
2195                         }
2196                 }
2197         }
2198
2199         /* The MDT-object exists, but related layout EA slot is occupied
2200          * by others. */
2201         if (bk->lb_param & LPF_DRYRUN)
2202                 GOTO(unlock_parent, rc = 1);
2203
2204         dt_write_unlock(env, parent);
2205         if (handle != NULL)
2206                 dt_trans_stop(env, dt, handle);
2207         lfsck_ibits_unlock(&lh, LCK_EX);
2208         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2209                 objs = &lmm->lmm_objects[ea_off];
2210         else
2211                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2212         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2213                                           buf, objs, ea_off);
2214
2215         RETURN(rc);
2216
2217 unlock_parent:
2218         if (locked)
2219                 dt_write_unlock(env, parent);
2220
2221 stop:
2222         if (handle != NULL)
2223                 dt_trans_stop(env, dt, handle);
2224
2225 unlock_layout:
2226         lfsck_ibits_unlock(&lh, LCK_EX);
2227
2228         return rc;
2229 }
2230
2231 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2232                                         struct lfsck_component *com,
2233                                         struct lfsck_tgt_desc *ltd,
2234                                         struct lu_orphan_rec *rec,
2235                                         struct lu_fid *cfid)
2236 {
2237         struct lfsck_layout     *lo     = com->lc_file_ram;
2238         struct lu_fid           *pfid   = &rec->lor_fid;
2239         struct dt_object        *parent = NULL;
2240         __u32                    ea_off = pfid->f_stripe_idx;
2241         int                      rc     = 0;
2242         ENTRY;
2243
2244         if (!fid_is_sane(cfid))
2245                 GOTO(out, rc = -EINVAL);
2246
2247         if (fid_is_zero(pfid)) {
2248                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2249                                                   "", "N", ea_off);
2250                 GOTO(out, rc);
2251         }
2252
2253         pfid->f_ver = 0;
2254         if (!fid_is_sane(pfid))
2255                 GOTO(out, rc = -EINVAL);
2256
2257         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2258         if (IS_ERR(parent))
2259                 GOTO(out, rc = PTR_ERR(parent));
2260
2261         if (unlikely(dt_object_remote(parent) != 0))
2262                 GOTO(put, rc = -EXDEV);
2263
2264         if (dt_object_exists(parent) == 0) {
2265                 lu_object_put(env, &parent->do_lu);
2266                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2267                                                   "", "R", ea_off);
2268                 GOTO(out, rc);
2269         }
2270
2271         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2272                 GOTO(put, rc = -EISDIR);
2273
2274         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2275                                          ltd->ltd_index, ea_off);
2276
2277         GOTO(put, rc);
2278
2279 put:
2280         if (rc <= 0)
2281                 lu_object_put(env, &parent->do_lu);
2282         else
2283                 /* The layout EA is changed, need to be reloaded next time. */
2284                 lu_object_put_nocache(env, &parent->do_lu);
2285
2286 out:
2287         down_write(&com->lc_sem);
2288         com->lc_new_scanned++;
2289         com->lc_new_checked++;
2290         if (rc > 0) {
2291                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2292                 rc = 0;
2293         } else if (rc < 0) {
2294                 lo->ll_objs_failed_phase2++;
2295         }
2296         up_write(&com->lc_sem);
2297
2298         return rc;
2299 }
2300
2301 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2302                                     struct lfsck_component *com,
2303                                     struct lfsck_tgt_desc *ltd)
2304 {
2305         struct lfsck_layout             *lo     = com->lc_file_ram;
2306         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2307         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2308         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2309         struct ost_id                   *oi     = &info->lti_oi;
2310         struct lu_fid                   *fid    = &info->lti_fid;
2311         struct dt_object                *obj;
2312         const struct dt_it_ops          *iops;
2313         struct dt_it                    *di;
2314         int                              rc     = 0;
2315         ENTRY;
2316
2317         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2318                "scanning for OST%04x\n",
2319                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2320
2321         ostid_set_seq(oi, FID_SEQ_IDIF);
2322         ostid_set_id(oi, 0);
2323         rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2324         if (rc != 0)
2325                 GOTO(log, rc);
2326
2327         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2328         if (unlikely(IS_ERR(obj)))
2329                 GOTO(log, rc = PTR_ERR(obj));
2330
2331         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2332         if (rc != 0)
2333                 GOTO(put, rc);
2334
2335         iops = &obj->do_index_ops->dio_it;
2336         di = iops->init(env, obj, 0, BYPASS_CAPA);
2337         if (IS_ERR(di))
2338                 GOTO(put, rc = PTR_ERR(di));
2339
2340         rc = iops->load(env, di, 0);
2341         if (rc == -ESRCH) {
2342                 /* -ESRCH means that the orphan OST-objects rbtree has been
2343                  * cleanup because of the OSS server restart or other errors. */
2344                 lo->ll_flags |= LF_INCOMPLETE;
2345                 GOTO(fini, rc);
2346         }
2347
2348         if (rc == 0)
2349                 rc = iops->next(env, di);
2350         else if (rc > 0)
2351                 rc = 0;
2352
2353         if (rc < 0)
2354                 GOTO(fini, rc);
2355
2356         if (rc > 0)
2357                 GOTO(fini, rc = 0);
2358
2359         do {
2360                 struct dt_key           *key;
2361                 struct lu_orphan_rec    *rec = &info->lti_rec;
2362
2363                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2364                     cfs_fail_val > 0) {
2365                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2366                         struct l_wait_info       lwi;
2367
2368                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2369                                           NULL, NULL);
2370                         l_wait_event(thread->t_ctl_waitq,
2371                                      !thread_is_running(thread),
2372                                      &lwi);
2373                 }
2374
2375                 key = iops->key(env, di);
2376                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2377                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2378                 if (rc == 0)
2379                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2380                                         &com->lc_fid_latest_scanned_phase2);
2381                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2382                         GOTO(fini, rc);
2383
2384                 lfsck_control_speed_by_self(com);
2385                 do {
2386                         rc = iops->next(env, di);
2387                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2388         } while (rc == 0);
2389
2390         GOTO(fini, rc);
2391
2392 fini:
2393         iops->put(env, di);
2394         iops->fini(env, di);
2395 put:
2396         lu_object_put(env, &obj->do_lu);
2397
2398 log:
2399         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2400                "scanning for OST%04x: rc = %d\n",
2401                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2402
2403         return rc > 0 ? 0 : rc;
2404 }
2405
2406 /* For the MDT-object with dangling reference, we need to repare the
2407  * inconsistency according to the LFSCK sponsor's requirement:
2408  *
2409  * 1) Keep the inconsistency there and report the inconsistency case,
2410  *    then give the chance to the application to find related issues,
2411  *    and the users can make the decision about how to handle it with
2412  *    more human knownledge. (by default)
2413  *
2414  * 2) Re-create the missed OST-object with the FID/owner information. */
2415 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2416                                         struct lfsck_component *com,
2417                                         struct lfsck_layout_req *llr,
2418                                         const struct lu_attr *pla)
2419 {
2420         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2421         struct filter_fid               *pfid   = &info->lti_new_pfid;
2422         struct dt_allocation_hint       *hint   = &info->lti_hint;
2423         struct lu_attr                  *cla    = &info->lti_la2;
2424         struct dt_object                *parent = llr->llr_parent->llo_obj;
2425         struct dt_object                *child  = llr->llr_child;
2426         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2427         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2428         struct thandle                  *handle;
2429         struct lu_buf                   *buf;
2430         struct lustre_handle             lh     = { 0 };
2431         int                              rc;
2432         bool                             create;
2433         ENTRY;
2434
2435         if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2436                 create = true;
2437         else
2438                 create = false;
2439
2440         if (!create)
2441                 GOTO(log, rc = 1);
2442
2443         memset(cla, 0, sizeof(*cla));
2444         cla->la_uid = pla->la_uid;
2445         cla->la_gid = pla->la_gid;
2446         cla->la_mode = S_IFREG | 0666;
2447         cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2448                         LA_ATIME | LA_MTIME | LA_CTIME;
2449
2450         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2451                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2452                               LCK_EX);
2453         if (rc != 0)
2454                 GOTO(log, rc);
2455
2456         handle = dt_trans_create(env, dev);
2457         if (IS_ERR(handle))
2458                 GOTO(unlock1, rc = PTR_ERR(handle));
2459
2460         hint->dah_parent = NULL;
2461         hint->dah_mode = 0;
2462         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2463         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2464         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2465          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2466          * parent MDT-object's layout EA. */
2467         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2468         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2469
2470         rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2471         if (rc != 0)
2472                 GOTO(stop, rc);
2473
2474         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2475                                   LU_XATTR_CREATE, handle);
2476         if (rc != 0)
2477                 GOTO(stop, rc);
2478
2479         rc = dt_trans_start(env, dev, handle);
2480         if (rc != 0)
2481                 GOTO(stop, rc);
2482
2483         dt_read_lock(env, parent, 0);
2484         if (unlikely(lfsck_is_dead_obj(parent)))
2485                 GOTO(unlock2, rc = 1);
2486
2487         rc = dt_create(env, child, cla, hint, NULL, handle);
2488         if (rc != 0)
2489                 GOTO(unlock2, rc);
2490
2491         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2492                           handle, BYPASS_CAPA);
2493
2494         GOTO(unlock2, rc);
2495
2496 unlock2:
2497         dt_read_unlock(env, parent);
2498
2499 stop:
2500         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2501
2502 unlock1:
2503         lfsck_ibits_unlock(&lh, LCK_EX);
2504
2505 log:
2506         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2507                "reference for: parent "DFID", child "DFID", OST-index %u, "
2508                "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2509                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2510                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
2511                llr->llr_lov_idx, pla->la_uid, pla->la_gid,
2512                create ? "Create the lost OST-object as required" :
2513                         "Keep the MDT-object there by default", rc);
2514
2515         return rc;
2516 }
2517
2518 /* If the OST-object does not recognize the MDT-object as its parent, and
2519  * there is no other MDT-object claims as its parent, then just trust the
2520  * given MDT-object as its parent. So update the OST-object filter_fid. */
2521 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2522                                               struct lfsck_component *com,
2523                                               struct lfsck_layout_req *llr,
2524                                               const struct lu_attr *pla)
2525 {
2526         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2527         struct filter_fid               *pfid   = &info->lti_new_pfid;
2528         struct lu_attr                  *tla    = &info->lti_la3;
2529         struct dt_object                *parent = llr->llr_parent->llo_obj;
2530         struct dt_object                *child  = llr->llr_child;
2531         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2532         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2533         struct thandle                  *handle;
2534         struct lu_buf                   *buf;
2535         struct lustre_handle             lh     = { 0 };
2536         int                              rc;
2537         ENTRY;
2538
2539         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2540                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2541                               LCK_EX);
2542         if (rc != 0)
2543                 GOTO(log, rc);
2544
2545         handle = dt_trans_create(env, dev);
2546         if (IS_ERR(handle))
2547                 GOTO(unlock1, rc = PTR_ERR(handle));
2548
2549         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2550         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2551         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2552          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2553          * parent MDT-object's layout EA. */
2554         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2555         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2556
2557         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
2558         if (rc != 0)
2559                 GOTO(stop, rc);
2560
2561         tla->la_valid = LA_UID | LA_GID;
2562         tla->la_uid = pla->la_uid;
2563         tla->la_gid = pla->la_gid;
2564         rc = dt_declare_attr_set(env, child, tla, handle);
2565         if (rc != 0)
2566                 GOTO(stop, rc);
2567
2568         rc = dt_trans_start(env, dev, handle);
2569         if (rc != 0)
2570                 GOTO(stop, rc);
2571
2572         dt_write_lock(env, parent, 0);
2573         if (unlikely(lfsck_is_dead_obj(parent)))
2574                 GOTO(unlock2, rc = 1);
2575
2576         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
2577                           BYPASS_CAPA);
2578         if (rc != 0)
2579                 GOTO(unlock2, rc);
2580
2581         /* Get the latest parent's owner. */
2582         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2583         if (rc != 0)
2584                 GOTO(unlock2, rc);
2585
2586         tla->la_valid = LA_UID | LA_GID;
2587         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2588
2589         GOTO(unlock2, rc);
2590
2591 unlock2:
2592         dt_write_unlock(env, parent);
2593
2594 stop:
2595         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2596
2597 unlock1:
2598         lfsck_ibits_unlock(&lh, LCK_EX);
2599
2600 log:
2601         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
2602                "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
2603                "stripe-index %u, owner %u/%u: rc = %d\n",
2604                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2605                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
2606                pla->la_uid, pla->la_gid, rc);
2607
2608         return rc;
2609 }
2610
2611 /* If there are more than one MDT-objects claim as the OST-object's parent,
2612  * and the OST-object only recognizes one of them, then we need to generate
2613  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
2614 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
2615                                                    struct lfsck_component *com,
2616                                                    struct lfsck_layout_req *llr,
2617                                                    struct lu_attr *la,
2618                                                    struct lu_buf *buf)
2619 {
2620         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2621         struct dt_allocation_hint       *hint   = &info->lti_hint;
2622         struct dt_object_format         *dof    = &info->lti_dof;
2623         struct dt_device                *pdev   = com->lc_lfsck->li_next;
2624         struct ost_id                   *oi     = &info->lti_oi;
2625         struct dt_object                *parent = llr->llr_parent->llo_obj;
2626         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
2627         struct dt_object                *child  = NULL;
2628         struct lu_device                *d      = &cdev->dd_lu_dev;
2629         struct lu_object                *o      = NULL;
2630         struct thandle                  *handle;
2631         struct lov_mds_md_v1            *lmm;
2632         struct lov_ost_data_v1          *objs;
2633         struct lustre_handle             lh     = { 0 };
2634         struct lu_buf                    ea_buf;
2635         __u32                            magic;
2636         int                              rc;
2637         ENTRY;
2638
2639         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2640                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2641                               LCK_EX);
2642         if (rc != 0)
2643                 GOTO(log, rc);
2644
2645         handle = dt_trans_create(env, pdev);
2646         if (IS_ERR(handle))
2647                 GOTO(unlock1, rc = PTR_ERR(handle));
2648
2649         o = lu_object_anon(env, d, NULL);
2650         if (IS_ERR(o))
2651                 GOTO(stop, rc = PTR_ERR(o));
2652
2653         child = container_of(o, struct dt_object, do_lu);
2654         o = lu_object_locate(o->lo_header, d->ld_type);
2655         if (unlikely(o == NULL))
2656                 GOTO(stop, rc = -EINVAL);
2657
2658         child = container_of(o, struct dt_object, do_lu);
2659         la->la_valid = LA_UID | LA_GID;
2660         hint->dah_parent = NULL;
2661         hint->dah_mode = 0;
2662         dof->dof_type = DFT_REGULAR;
2663         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
2664         if (rc != 0)
2665                 GOTO(stop, rc);
2666
2667         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2668                                   LU_XATTR_REPLACE, handle);
2669         if (rc != 0)
2670                 GOTO(stop, rc);
2671
2672         rc = dt_trans_start(env, pdev, handle);
2673         if (rc != 0)
2674                 GOTO(stop, rc);
2675
2676         dt_write_lock(env, parent, 0);
2677         if (unlikely(lfsck_is_dead_obj(parent)))
2678                 GOTO(unlock2, rc = 0);
2679
2680         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2681         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
2682                 GOTO(unlock2, rc = 0);
2683
2684         lmm = buf->lb_buf;
2685         /* Someone change layout during the LFSCK, no need to repair then. */
2686         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
2687                 GOTO(unlock2, rc = 0);
2688
2689         rc = dt_create(env, child, la, hint, dof, handle);
2690         if (rc != 0)
2691                 GOTO(unlock2, rc);
2692
2693         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2694          * been verified in lfsck_layout_verify_header() already. If some
2695          * new magic introduced in the future, then layout LFSCK needs to
2696          * be updated also. */
2697         magic = le32_to_cpu(lmm->lmm_magic);
2698         if (magic == LOV_MAGIC_V1) {
2699                 objs = &lmm->lmm_objects[0];
2700         } else {
2701                 LASSERT(magic == LOV_MAGIC_V3);
2702                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2703         }
2704
2705         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
2706         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
2707         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
2708         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
2709         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
2710         lfsck_buf_init(&ea_buf, lmm,
2711                        lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
2712                                        magic));
2713         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
2714                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
2715
2716         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2717
2718 unlock2:
2719         dt_write_unlock(env, parent);
2720
2721 stop:
2722         if (child != NULL)
2723                 lu_object_put(env, &child->do_lu);
2724
2725         dt_trans_stop(env, pdev, handle);
2726
2727 unlock1:
2728         lfsck_ibits_unlock(&lh, LCK_EX);
2729
2730 log:
2731         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
2732                "references for: parent "DFID", OST-index %u, stripe-index %u, "
2733                "owner %u/%u: rc = %d\n",
2734                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2735                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
2736
2737         return rc;
2738 }
2739
2740 /* If the MDT-object and the OST-object have different owner information,
2741  * then trust the MDT-object, because the normal chown/chgrp handle order
2742  * is from MDT to OST, and it is possible that some chown/chgrp operation
2743  * is partly done. */
2744 static int lfsck_layout_repair_owner(const struct lu_env *env,
2745                                      struct lfsck_component *com,
2746                                      struct lfsck_layout_req *llr,
2747                                      struct lu_attr *pla)
2748 {
2749         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2750         struct lu_attr                  *tla    = &info->lti_la3;
2751         struct dt_object                *parent = llr->llr_parent->llo_obj;
2752         struct dt_object                *child  = llr->llr_child;
2753         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2754         struct thandle                  *handle;
2755         int                              rc;
2756         ENTRY;
2757
2758         handle = dt_trans_create(env, dev);
2759         if (IS_ERR(handle))
2760                 GOTO(log, rc = PTR_ERR(handle));
2761
2762         tla->la_uid = pla->la_uid;
2763         tla->la_gid = pla->la_gid;
2764         tla->la_valid = LA_UID | LA_GID;
2765         rc = dt_declare_attr_set(env, child, tla, handle);
2766         if (rc != 0)
2767                 GOTO(stop, rc);
2768
2769         rc = dt_trans_start(env, dev, handle);
2770         if (rc != 0)
2771                 GOTO(stop, rc);
2772
2773         /* Use the dt_object lock to serialize with destroy and attr_set. */
2774         dt_read_lock(env, parent, 0);
2775         if (unlikely(lfsck_is_dead_obj(parent)))
2776                 GOTO(unlock, rc = 1);
2777
2778         /* Get the latest parent's owner. */
2779         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2780         if (rc != 0)
2781                 GOTO(unlock, rc);
2782
2783         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
2784         if (unlikely(tla->la_uid != pla->la_uid ||
2785                      tla->la_gid != pla->la_gid))
2786                 GOTO(unlock, rc = 1);
2787
2788         tla->la_valid = LA_UID | LA_GID;
2789         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2790
2791         GOTO(unlock, rc);
2792
2793 unlock:
2794         dt_read_unlock(env, parent);
2795
2796 stop:
2797         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2798
2799 log:
2800         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
2801                "file owner for: parent "DFID", child "DFID", OST-index %u, "
2802                "stripe-index %u, owner %u/%u: rc = %d\n",
2803                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2804                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
2805                pla->la_uid, pla->la_gid, rc);
2806
2807         return rc;
2808 }
2809
2810 /* Check whether the OST-object correctly back points to the
2811  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
2812 static int lfsck_layout_check_parent(const struct lu_env *env,
2813                                      struct lfsck_component *com,
2814                                      struct dt_object *parent,
2815                                      const struct lu_fid *pfid,
2816                                      const struct lu_fid *cfid,
2817                                      const struct lu_attr *pla,
2818                                      const struct lu_attr *cla,
2819                                      struct lfsck_layout_req *llr,
2820                                      struct lu_buf *lov_ea, __u32 idx)
2821 {
2822         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2823         struct lu_buf                   *buf    = &info->lti_big_buf;
2824         struct dt_object                *tobj;
2825         struct lov_mds_md_v1            *lmm;
2826         struct lov_ost_data_v1          *objs;
2827         int                              rc;
2828         int                              i;
2829         __u32                            magic;
2830         __u16                            count;
2831         ENTRY;
2832
2833         if (fid_is_zero(pfid)) {
2834                 /* client never wrote. */
2835                 if (cla->la_size == 0 && cla->la_blocks == 0) {
2836                         if (unlikely(cla->la_uid != pla->la_uid ||
2837                                      cla->la_gid != pla->la_gid))
2838                                 RETURN (LLIT_INCONSISTENT_OWNER);
2839
2840                         RETURN(0);
2841                 }
2842
2843                 RETURN(LLIT_UNMATCHED_PAIR);
2844         }
2845
2846         if (unlikely(!fid_is_sane(pfid)))
2847                 RETURN(LLIT_UNMATCHED_PAIR);
2848
2849         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
2850                 if (llr->llr_lov_idx == idx)
2851                         RETURN(0);
2852
2853                 RETURN(LLIT_UNMATCHED_PAIR);
2854         }
2855
2856         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
2857         if (IS_ERR(tobj))
2858                 RETURN(PTR_ERR(tobj));
2859
2860         dt_read_lock(env, tobj, 0);
2861         if (dt_object_exists(tobj) == 0 ||
2862             lfsck_is_dead_obj(tobj))
2863                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2864
2865         if (!S_ISREG(lfsck_object_type(tobj)))
2866                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2867
2868         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
2869          * remote one on another MDT. Then check whether the given OST-object
2870          * is in such layout. If yes, it is multiple referenced, otherwise it
2871          * is unmatched referenced case. */
2872         rc = lfsck_layout_get_lovea(env, tobj, buf);
2873         if (rc == 0 || rc == -ENOENT)
2874                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2875
2876         if (rc < 0)
2877                 GOTO(out, rc);
2878
2879         lmm = buf->lb_buf;
2880         magic = le32_to_cpu(lmm->lmm_magic);
2881         if (magic == LOV_MAGIC_V1) {
2882                 objs = &lmm->lmm_objects[0];
2883         } else {
2884                 LASSERT(magic == LOV_MAGIC_V3);
2885                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2886         }
2887
2888         count = le16_to_cpu(lmm->lmm_stripe_count);
2889         for (i = 0; i < count; i++, objs++) {
2890                 struct lu_fid           *tfid   = &info->lti_fid2;
2891                 struct ost_id           *oi     = &info->lti_oi;
2892                 __u32                    idx2;
2893
2894                 if (lovea_slot_is_dummy(objs))
2895                         continue;
2896
2897                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2898                 idx2 = le32_to_cpu(objs->l_ost_idx);
2899                 rc = ostid_to_fid(tfid, oi, idx2);
2900                 if (rc != 0) {
2901                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2902                                "invalid layout EA at the slot %d, index %u\n",
2903                                lfsck_lfsck2name(com->lc_lfsck),
2904                                PFID(pfid), i, idx2);
2905
2906                         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2907                 }
2908
2909                 if (lu_fid_eq(cfid, tfid)) {
2910                         *lov_ea = *buf;
2911
2912                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
2913                 }
2914         }
2915
2916         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2917
2918 out:
2919         dt_read_unlock(env, tobj);
2920         lfsck_object_put(env, tobj);
2921
2922         return rc;
2923 }
2924
2925 static int lfsck_layout_assistant_handler_p1(const struct lu_env *env,
2926                                              struct lfsck_component *com,
2927                                              struct lfsck_assistant_req *lar)
2928 {
2929         struct lfsck_layout_req              *llr    =
2930                         container_of0(lar, struct lfsck_layout_req, llr_lar);
2931         struct lfsck_layout                  *lo     = com->lc_file_ram;
2932         struct lfsck_thread_info             *info   = lfsck_env_info(env);
2933         struct filter_fid_old                *pea    = &info->lti_old_pfid;
2934         struct lu_fid                        *pfid   = &info->lti_fid;
2935         struct lu_buf                         buf    = { 0 };
2936         struct dt_object                     *parent = llr->llr_parent->llo_obj;
2937         struct dt_object                     *child  = llr->llr_child;
2938         struct lu_attr                       *pla    = &info->lti_la;
2939         struct lu_attr                       *cla    = &info->lti_la2;
2940         struct lfsck_instance                *lfsck  = com->lc_lfsck;
2941         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
2942         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
2943         __u32                                 idx    = 0;
2944         int                                   rc;
2945         ENTRY;
2946
2947         if (unlikely(lfsck_is_dead_obj(parent)))
2948                 RETURN(0);
2949
2950         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
2951         if (rc != 0)
2952                 GOTO(out, rc);
2953
2954         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
2955         if (rc == -ENOENT) {
2956                 if (unlikely(lfsck_is_dead_obj(parent)))
2957                         RETURN(0);
2958
2959                 type = LLIT_DANGLING;
2960                 goto repair;
2961         }
2962
2963         if (rc != 0)
2964                 GOTO(out, rc);
2965
2966         lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
2967         rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
2968         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
2969                      rc != sizeof(struct filter_fid))) {
2970                 type = LLIT_UNMATCHED_PAIR;
2971                 goto repair;
2972         }
2973
2974         if (rc < 0 && rc != -ENODATA)
2975                 GOTO(out, rc);
2976
2977         if (rc == -ENODATA) {
2978                 fid_zero(pfid);
2979         } else {
2980                 fid_le_to_cpu(pfid, &pea->ff_parent);
2981                 /* Currently, the filter_fid::ff_parent::f_ver is not the
2982                  * real parent MDT-object's FID::f_ver, instead it is the
2983                  * OST-object index in its parent MDT-object's layout EA. */
2984                 idx = pfid->f_stripe_idx;
2985                 pfid->f_ver = 0;
2986         }
2987
2988         rc = lfsck_layout_check_parent(env, com, parent, pfid,
2989                                        lu_object_fid(&child->do_lu),
2990                                        pla, cla, llr, &buf, idx);
2991         if (rc > 0) {
2992                 type = rc;
2993                 goto repair;
2994         }
2995
2996         if (rc < 0)
2997                 GOTO(out, rc);
2998
2999         if (unlikely(cla->la_uid != pla->la_uid ||
3000                      cla->la_gid != pla->la_gid)) {
3001                 type = LLIT_INCONSISTENT_OWNER;
3002                 goto repair;
3003         }
3004
3005 repair:
3006         if (bk->lb_param & LPF_DRYRUN) {
3007                 if (type != LLIT_NONE)
3008                         GOTO(out, rc = 1);
3009                 else
3010                         GOTO(out, rc = 0);
3011         }
3012
3013         switch (type) {
3014         case LLIT_DANGLING:
3015                 rc = lfsck_layout_repair_dangling(env, com, llr, pla);
3016                 break;
3017         case LLIT_UNMATCHED_PAIR:
3018                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3019                 break;
3020         case LLIT_MULTIPLE_REFERENCED:
3021                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3022                                                              pla, &buf);
3023                 break;
3024         case LLIT_INCONSISTENT_OWNER:
3025                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3026                 break;
3027         default:
3028                 rc = 0;
3029                 break;
3030         }
3031
3032         GOTO(out, rc);
3033
3034 out:
3035         down_write(&com->lc_sem);
3036         if (rc < 0) {
3037                 struct lfsck_assistant_data *lad = com->lc_data;
3038
3039                 if (unlikely(lad->lad_exit)) {
3040                         rc = 0;
3041                 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3042                            rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3043                            rc == -EHOSTUNREACH) {
3044                         /* If cannot touch the target server,
3045                          * mark the LFSCK as INCOMPLETE. */
3046                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3047                                "talk with OST %x: rc = %d\n",
3048                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3049                         lo->ll_flags |= LF_INCOMPLETE;
3050                         lo->ll_objs_skipped++;
3051                         rc = 0;
3052                 } else {
3053                         lfsck_layout_record_failure(env, lfsck, lo);
3054                 }
3055         } else if (rc > 0) {
3056                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3057                          "unknown type = %d\n", type);
3058
3059                 lo->ll_objs_repaired[type - 1]++;
3060                 if (bk->lb_param & LPF_DRYRUN &&
3061                     unlikely(lo->ll_pos_first_inconsistent == 0))
3062                         lo->ll_pos_first_inconsistent =
3063                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3064                                                         lfsck->li_di_oit);
3065         }
3066         up_write(&com->lc_sem);
3067
3068         return rc;
3069 }
3070
3071 static int lfsck_layout_assistant_handler_p2(const struct lu_env *env,
3072                                              struct lfsck_component *com)
3073 {
3074         struct lfsck_assistant_data     *lad    = com->lc_data;
3075         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3076         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
3077         struct lfsck_tgt_descs          *ltds   = &lfsck->li_ost_descs;
3078         struct lfsck_tgt_desc           *ltd;
3079         int                              rc     = 0;
3080         ENTRY;
3081
3082         spin_lock(&ltds->ltd_lock);
3083         while (!list_empty(&lad->lad_ost_phase2_list)) {
3084                 ltd = list_entry(lad->lad_ost_phase2_list.next,
3085                                  struct lfsck_tgt_desc,
3086                                  ltd_layout_phase_list);
3087                 list_del_init(&ltd->ltd_layout_phase_list);
3088                 if (bk->lb_param & LPF_ALL_TGT) {
3089                         spin_unlock(&ltds->ltd_lock);
3090                         rc = lfsck_layout_scan_orphan(env, com, ltd);
3091                         if (rc != 0 && bk->lb_param & LPF_FAILOUT)
3092                                 RETURN(rc);
3093
3094                         if (unlikely(lad->lad_exit ||
3095                                      !thread_is_running(&lfsck->li_thread)))
3096                                 RETURN(0);
3097                         spin_lock(&ltds->ltd_lock);
3098                 }
3099         }
3100
3101         if (list_empty(&lad->lad_ost_phase1_list))
3102                 rc = 1;
3103         else
3104                 rc = 0;
3105         spin_unlock(&ltds->ltd_lock);
3106
3107         RETURN(rc);
3108 }
3109
3110 static int
3111 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3112                                    struct ptlrpc_request *req,
3113                                    void *args, int rc)
3114 {
3115         struct lfsck_layout_slave_async_args *llsaa = args;
3116         struct obd_export                    *exp   = llsaa->llsaa_exp;
3117         struct lfsck_component               *com   = llsaa->llsaa_com;
3118         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3119         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3120         struct lfsck_reply                   *lr    = NULL;
3121         bool                                  done  = false;
3122
3123         if (rc != 0) {
3124                 /* It is quite probably caused by target crash,
3125                  * to make the LFSCK can go ahead, assume that
3126                  * the target finished the LFSCK prcoessing. */
3127                 done = true;
3128         } else {
3129                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3130                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3131                     lr->lr_status != LS_SCANNING_PHASE2)
3132                         done = true;
3133         }
3134
3135         if (done) {
3136                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave gets the MDT %x "
3137                        "status %d\n", lfsck_lfsck2name(com->lc_lfsck),
3138                        llst->llst_index, lr != NULL ? lr->lr_status : rc);
3139
3140                 lfsck_layout_llst_del(llsd, llst);
3141         }
3142
3143         lfsck_layout_llst_put(llst);
3144         lfsck_component_put(env, com);
3145         class_export_put(exp);
3146
3147         return 0;
3148 }
3149
3150 static int lfsck_layout_async_query(const struct lu_env *env,
3151                                     struct lfsck_component *com,
3152                                     struct obd_export *exp,
3153                                     struct lfsck_layout_slave_target *llst,
3154                                     struct lfsck_request *lr,
3155                                     struct ptlrpc_request_set *set)
3156 {
3157         struct lfsck_layout_slave_async_args *llsaa;
3158         struct ptlrpc_request                *req;
3159         struct lfsck_request                 *tmp;
3160         int                                   rc;
3161         ENTRY;
3162
3163         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3164         if (req == NULL)
3165                 RETURN(-ENOMEM);
3166
3167         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3168         if (rc != 0) {
3169                 ptlrpc_request_free(req);
3170                 RETURN(rc);
3171         }
3172
3173         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3174         *tmp = *lr;
3175         ptlrpc_request_set_replen(req);
3176
3177         llsaa = ptlrpc_req_async_args(req);
3178         llsaa->llsaa_exp = exp;
3179         llsaa->llsaa_com = lfsck_component_get(com);
3180         llsaa->llsaa_llst = llst;
3181         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3182         ptlrpc_set_add_req(set, req);
3183
3184         RETURN(0);
3185 }
3186
3187 static int lfsck_layout_async_notify(const struct lu_env *env,
3188                                      struct obd_export *exp,
3189                                      struct lfsck_request *lr,
3190                                      struct ptlrpc_request_set *set)
3191 {
3192         struct ptlrpc_request   *req;
3193         struct lfsck_request    *tmp;
3194         int                      rc;
3195         ENTRY;
3196
3197         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3198         if (req == NULL)
3199                 RETURN(-ENOMEM);
3200
3201         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3202         if (rc != 0) {
3203                 ptlrpc_request_free(req);
3204                 RETURN(rc);
3205         }
3206
3207         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3208         *tmp = *lr;
3209         ptlrpc_request_set_replen(req);
3210         ptlrpc_set_add_req(set, req);
3211
3212         RETURN(0);
3213 }
3214
3215 static int
3216 lfsck_layout_slave_query_master(const struct lu_env *env,
3217                                 struct lfsck_component *com)
3218 {
3219         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3220         struct lfsck_instance            *lfsck = com->lc_lfsck;
3221         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3222         struct lfsck_layout_slave_target *llst;
3223         struct obd_export                *exp;
3224         struct ptlrpc_request_set        *set;
3225         int                               rc    = 0;
3226         int                               rc1   = 0;
3227         ENTRY;
3228
3229         set = ptlrpc_prep_set();
3230         if (set == NULL)
3231                 GOTO(log, rc = -ENOMEM);
3232
3233         memset(lr, 0, sizeof(*lr));
3234         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3235         lr->lr_event = LE_QUERY;
3236         lr->lr_active = LFSCK_TYPE_LAYOUT;
3237
3238         llsd->llsd_touch_gen++;
3239         spin_lock(&llsd->llsd_lock);
3240         while (!list_empty(&llsd->llsd_master_list)) {
3241                 llst = list_entry(llsd->llsd_master_list.next,
3242                                   struct lfsck_layout_slave_target,
3243                                   llst_list);
3244                 if (llst->llst_gen == llsd->llsd_touch_gen)
3245                         break;
3246
3247                 llst->llst_gen = llsd->llsd_touch_gen;
3248                 list_move_tail(&llst->llst_list,
3249                                &llsd->llsd_master_list);
3250                 atomic_inc(&llst->llst_ref);
3251                 spin_unlock(&llsd->llsd_lock);
3252
3253                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3254                                                llst->llst_index);
3255                 if (exp == NULL) {
3256                         lfsck_layout_llst_del(llsd, llst);
3257                         lfsck_layout_llst_put(llst);
3258                         spin_lock(&llsd->llsd_lock);
3259                         continue;
3260                 }
3261
3262                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3263                 if (rc != 0) {
3264                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3265                                "query %s for layout: rc = %d\n",
3266                                lfsck_lfsck2name(lfsck),
3267                                exp->exp_obd->obd_name, rc);
3268
3269                         rc1 = rc;
3270                         lfsck_layout_llst_put(llst);
3271                         class_export_put(exp);
3272                 }
3273                 spin_lock(&llsd->llsd_lock);
3274         }
3275         spin_unlock(&llsd->llsd_lock);
3276
3277         rc = ptlrpc_set_wait(set);
3278         ptlrpc_set_destroy(set);
3279
3280         GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
3281
3282 log:
3283         CDEBUG(D_LFSCK, "%s: layout LFSCK slave queries master: rc = %d\n",
3284                lfsck_lfsck2name(com->lc_lfsck), rc);
3285
3286         return rc;
3287 }
3288
3289 static void
3290 lfsck_layout_slave_notify_master(const struct lu_env *env,
3291                                  struct lfsck_component *com,
3292                                  enum lfsck_events event, int result)
3293 {
3294         struct lfsck_instance            *lfsck = com->lc_lfsck;
3295         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3296         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3297         struct lfsck_layout_slave_target *llst;
3298         struct obd_export                *exp;
3299         struct ptlrpc_request_set        *set;
3300         int                               rc;
3301         ENTRY;
3302
3303         CDEBUG(D_LFSCK, "%s: layout LFSCK slave notifies master\n",
3304                lfsck_lfsck2name(com->lc_lfsck));
3305
3306         set = ptlrpc_prep_set();
3307         if (set == NULL)
3308                 RETURN_EXIT;
3309
3310         memset(lr, 0, sizeof(*lr));
3311         lr->lr_event = event;
3312         lr->lr_flags = LEF_FROM_OST;
3313         lr->lr_status = result;
3314         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3315         lr->lr_active = LFSCK_TYPE_LAYOUT;
3316         llsd->llsd_touch_gen++;
3317         spin_lock(&llsd->llsd_lock);
3318         while (!list_empty(&llsd->llsd_master_list)) {
3319                 llst = list_entry(llsd->llsd_master_list.next,
3320                                   struct lfsck_layout_slave_target,
3321                                   llst_list);
3322                 if (llst->llst_gen == llsd->llsd_touch_gen)
3323                         break;
3324
3325                 llst->llst_gen = llsd->llsd_touch_gen;
3326                 list_move_tail(&llst->llst_list,
3327                                &llsd->llsd_master_list);
3328                 atomic_inc(&llst->llst_ref);
3329                 spin_unlock(&llsd->llsd_lock);
3330
3331                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3332                                                llst->llst_index);
3333                 if (exp == NULL) {
3334                         lfsck_layout_llst_del(llsd, llst);
3335                         lfsck_layout_llst_put(llst);
3336                         spin_lock(&llsd->llsd_lock);
3337                         continue;
3338                 }
3339
3340                 rc = lfsck_layout_async_notify(env, exp, lr, set);
3341                 if (rc != 0)
3342                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3343                                "notify %s for layout: rc = %d\n",
3344                                lfsck_lfsck2name(lfsck),
3345                                exp->exp_obd->obd_name, rc);
3346
3347                 lfsck_layout_llst_put(llst);
3348                 class_export_put(exp);
3349                 spin_lock(&llsd->llsd_lock);
3350         }
3351         spin_unlock(&llsd->llsd_lock);
3352
3353         ptlrpc_set_wait(set);
3354         ptlrpc_set_destroy(set);
3355
3356         RETURN_EXIT;
3357 }
3358
3359 /*
3360  * \ret -ENODATA: unrecognized stripe
3361  * \ret = 0     : recognized stripe
3362  * \ret < 0     : other failures
3363  */
3364 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
3365                                            struct lfsck_component *com,
3366                                            struct lu_fid *cfid,
3367                                            struct lu_fid *pfid)
3368 {
3369         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3370         struct lu_buf                   *buf    = &info->lti_big_buf;
3371         struct ost_id                   *oi     = &info->lti_oi;
3372         struct dt_object                *obj;
3373         struct lov_mds_md_v1            *lmm;
3374         struct lov_ost_data_v1          *objs;
3375         __u32                            idx    = pfid->f_stripe_idx;
3376         __u32                            magic;
3377         int                              rc     = 0;
3378         int                              i;
3379         __u16                            count;
3380         ENTRY;
3381
3382         pfid->f_ver = 0;
3383         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
3384         if (IS_ERR(obj))
3385                 RETURN(PTR_ERR(obj));
3386
3387         dt_read_lock(env, obj, 0);
3388         if (unlikely(dt_object_exists(obj) == 0 ||
3389                      lfsck_is_dead_obj(obj)))
3390                 GOTO(unlock, rc = -ENOENT);
3391
3392         if (!S_ISREG(lfsck_object_type(obj)))
3393                 GOTO(unlock, rc = -ENODATA);
3394
3395         rc = lfsck_layout_get_lovea(env, obj, buf);
3396         if (rc < 0)
3397                 GOTO(unlock, rc);
3398
3399         if (rc == 0)
3400                 GOTO(unlock, rc = -ENODATA);
3401
3402         lmm = buf->lb_buf;
3403         rc = lfsck_layout_verify_header(lmm);
3404         if (rc != 0)
3405                 GOTO(unlock, rc);
3406
3407         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3408          * been verified in lfsck_layout_verify_header() already. If some
3409          * new magic introduced in the future, then layout LFSCK needs to
3410          * be updated also. */
3411         magic = le32_to_cpu(lmm->lmm_magic);
3412         if (magic == LOV_MAGIC_V1) {
3413                 objs = &lmm->lmm_objects[0];
3414         } else {
3415                 LASSERT(magic == LOV_MAGIC_V3);
3416                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3417         }
3418
3419         fid_to_ostid(cfid, oi);
3420         count = le16_to_cpu(lmm->lmm_stripe_count);
3421         for (i = 0; i < count; i++, objs++) {
3422                 struct ost_id oi2;
3423
3424                 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
3425                 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
3426                         GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
3427         }
3428
3429         GOTO(unlock, rc = -ENODATA);
3430
3431 unlock:
3432         dt_read_unlock(env, obj);
3433         lu_object_put(env, &obj->do_lu);
3434
3435         return rc;
3436 }
3437
3438 /*
3439  * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
3440  * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
3441  * layout EA from MDT to OST. On one hand, the OST no need to understand
3442  * the layout EA structure; on the other hand, it may cause trouble when
3443  * transfer large layout EA from MDT to OST via normal OUT RPC.
3444  *
3445  * \ret > 0: unrecognized stripe
3446  * \ret = 0: recognized stripe
3447  * \ret < 0: other failures
3448  */
3449 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
3450                                           struct lfsck_component *com,
3451                                           struct lu_fid *cfid,
3452                                           struct lu_fid *pfid)
3453 {
3454         struct lfsck_instance    *lfsck  = com->lc_lfsck;
3455         struct obd_device        *obd    = lfsck->li_obd;
3456         struct seq_server_site   *ss     =
3457                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
3458         struct obd_export        *exp    = NULL;
3459         struct ptlrpc_request    *req    = NULL;
3460         struct lfsck_request     *lr;
3461         struct lu_seq_range       range  = { 0 };
3462         int                       rc     = 0;
3463         ENTRY;
3464
3465         if (unlikely(fid_is_idif(pfid)))
3466                 RETURN(1);
3467
3468         fld_range_set_any(&range);
3469         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
3470         if (rc != 0)
3471                 RETURN(rc == -ENOENT ? 1 : rc);
3472
3473         if (unlikely(!fld_range_is_mdt(&range)))
3474                 RETURN(1);
3475
3476         exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
3477         if (unlikely(exp == NULL))
3478                 RETURN(1);
3479
3480         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
3481                 GOTO(out, rc = -EOPNOTSUPP);
3482
3483         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3484         if (req == NULL)
3485                 GOTO(out, rc = -ENOMEM);
3486
3487         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3488         if (rc != 0) {
3489                 ptlrpc_request_free(req);
3490
3491                 GOTO(out, rc);
3492         }
3493
3494         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3495         memset(lr, 0, sizeof(*lr));
3496         lr->lr_event = LE_PAIRS_VERIFY;
3497         lr->lr_active = LFSCK_TYPE_LAYOUT;
3498         lr->lr_fid = *cfid; /* OST-object itself FID. */
3499         lr->lr_fid2 = *pfid; /* The claimed parent FID. */
3500
3501         ptlrpc_request_set_replen(req);
3502         rc = ptlrpc_queue_wait(req);
3503         ptlrpc_req_finished(req);
3504
3505         if (rc == -ENOENT || rc == -ENODATA)
3506                 rc = 1;
3507
3508         GOTO(out, rc);
3509
3510 out:
3511         if (exp != NULL)
3512                 class_export_put(exp);
3513
3514         return rc;
3515 }
3516
3517 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
3518                                           struct lfsck_component *com,
3519                                           struct lfsck_request *lr)
3520 {
3521         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3522         struct filter_fid               *ff     = &info->lti_new_pfid;
3523         struct lu_buf                   *buf;
3524         struct dt_device                *dev    = com->lc_lfsck->li_bottom;
3525         struct dt_object                *obj;
3526         struct thandle                  *th     = NULL;
3527         int                              rc     = 0;
3528         ENTRY;
3529
3530         obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
3531         if (IS_ERR(obj))
3532                 GOTO(log, rc = PTR_ERR(obj));
3533
3534         fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
3535         buf = lfsck_buf_get(env, ff, sizeof(*ff));
3536         dt_write_lock(env, obj, 0);
3537         if (unlikely(dt_object_exists(obj) == 0 ||
3538                      lfsck_is_dead_obj(obj)))
3539                 GOTO(unlock, rc = 0);
3540
3541         th = dt_trans_create(env, dev);
3542         if (IS_ERR(th))
3543                 GOTO(unlock, rc = PTR_ERR(th));
3544
3545         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
3546         if (rc != 0)
3547                 GOTO(stop, rc);
3548
3549         rc = dt_trans_start_local(env, dev, th);
3550         if (rc != 0)
3551                 GOTO(stop, rc);
3552
3553         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
3554
3555         GOTO(stop, rc);
3556
3557 stop:
3558         dt_trans_stop(env, dev, th);
3559
3560 unlock:
3561         dt_write_unlock(env, obj);
3562         lu_object_put(env, &obj->do_lu);
3563
3564 log:
3565         CDEBUG(D_LFSCK, "%s: layout LFSCK slave repaired pfid for "DFID
3566                ", parent "DFID": rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
3567                PFID(&lr->lr_fid), PFID(&lr->lr_fid2), rc);
3568
3569         return rc;
3570 }
3571
3572 /* layout APIs */
3573
3574 static int lfsck_layout_reset(const struct lu_env *env,
3575                               struct lfsck_component *com, bool init)
3576 {
3577         struct lfsck_layout     *lo    = com->lc_file_ram;
3578         int                      rc;
3579
3580         down_write(&com->lc_sem);
3581         if (init) {
3582                 memset(lo, 0, com->lc_file_size);
3583         } else {
3584                 __u32 count = lo->ll_success_count;
3585                 __u64 last_time = lo->ll_time_last_complete;
3586
3587                 memset(lo, 0, com->lc_file_size);
3588                 lo->ll_success_count = count;
3589                 lo->ll_time_last_complete = last_time;
3590         }
3591
3592         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
3593         lo->ll_status = LS_INIT;
3594
3595         rc = lfsck_layout_store(env, com);
3596         up_write(&com->lc_sem);
3597
3598         CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
3599                lfsck_lfsck2name(com->lc_lfsck), rc);
3600
3601         return rc;
3602 }
3603
3604 static void lfsck_layout_fail(const struct lu_env *env,
3605                               struct lfsck_component *com, bool new_checked)
3606 {
3607         struct lfsck_layout *lo = com->lc_file_ram;
3608
3609         down_write(&com->lc_sem);
3610         if (new_checked)
3611                 com->lc_new_checked++;
3612         lfsck_layout_record_failure(env, com->lc_lfsck, lo);
3613         up_write(&com->lc_sem);
3614 }
3615
3616 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
3617                                           struct lfsck_component *com, bool init)
3618 {
3619         struct lfsck_instance   *lfsck   = com->lc_lfsck;
3620         struct lfsck_layout     *lo      = com->lc_file_ram;
3621         int                      rc;
3622
3623         if (!init) {
3624                 rc = lfsck_checkpoint_generic(env, com);
3625                 if (rc != 0)
3626                         return rc > 0 ? 0 : rc;
3627         }
3628
3629         down_write(&com->lc_sem);
3630         if (init) {
3631                 lo->ll_pos_latest_start =
3632                                 lfsck->li_pos_checkpoint.lp_oit_cookie;
3633         } else {
3634                 lo->ll_pos_last_checkpoint =
3635                                 lfsck->li_pos_checkpoint.lp_oit_cookie;
3636                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3637                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3638                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3639                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3640                 com->lc_new_checked = 0;
3641         }
3642
3643         rc = lfsck_layout_store(env, com);
3644         up_write(&com->lc_sem);
3645
3646         CDEBUG(D_LFSCK, "%s: layout LFSCK master checkpoint at the pos ["
3647                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
3648                lfsck->li_pos_current.lp_oit_cookie, rc);
3649
3650         return rc;
3651 }
3652
3653 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
3654                                          struct lfsck_component *com, bool init)
3655 {
3656         struct lfsck_instance   *lfsck = com->lc_lfsck;
3657         struct lfsck_layout     *lo    = com->lc_file_ram;
3658         int                      rc;
3659
3660         if (com->lc_new_checked == 0 && !init)
3661                 return 0;
3662
3663         down_write(&com->lc_sem);
3664         if (init) {
3665                 lo->ll_pos_latest_start =
3666                                 lfsck->li_pos_checkpoint.lp_oit_cookie;
3667         } else {
3668                 lo->ll_pos_last_checkpoint =
3669                                 lfsck->li_pos_checkpoint.lp_oit_cookie;
3670                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3671                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3672                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3673                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3674                 com->lc_new_checked = 0;
3675         }
3676
3677         rc = lfsck_layout_store(env, com);
3678         up_write(&com->lc_sem);
3679
3680         CDEBUG(D_LFSCK, "%s: layout LFSCK slave checkpoint at the pos ["
3681                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
3682                lfsck->li_pos_current.lp_oit_cookie, rc);
3683
3684         return rc;
3685 }
3686
3687 static int lfsck_layout_prep(const struct lu_env *env,
3688                              struct lfsck_component *com,
3689                              struct lfsck_start *start)
3690 {
3691         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3692         struct lfsck_layout     *lo     = com->lc_file_ram;
3693         struct lfsck_position   *pos    = &com->lc_pos_start;
3694
3695         fid_zero(&pos->lp_dir_parent);
3696         pos->lp_dir_cookie = 0;
3697         if (lo->ll_status == LS_COMPLETED ||
3698             lo->ll_status == LS_PARTIAL ||
3699             /* To handle orphan, must scan from the beginning. */
3700             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
3701                 int rc;
3702
3703                 rc = lfsck_layout_reset(env, com, false);
3704                 if (rc == 0)
3705                         rc = lfsck_set_param(env, lfsck, start, true);
3706
3707                 if (rc != 0) {
3708                         CDEBUG(D_LFSCK, "%s: layout LFSCK prep failed: "
3709                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
3710
3711                         return rc;
3712                 }
3713         }
3714
3715         down_write(&com->lc_sem);
3716         lo->ll_time_latest_start = cfs_time_current_sec();
3717         spin_lock(&lfsck->li_lock);
3718         if (lo->ll_flags & LF_SCANNED_ONCE) {
3719                 if (!lfsck->li_drop_dryrun ||
3720                     lo->ll_pos_first_inconsistent == 0) {
3721                         lo->ll_status = LS_SCANNING_PHASE2;
3722                         list_move_tail(&com->lc_link,
3723                                        &lfsck->li_list_double_scan);
3724                         pos->lp_oit_cookie = 0;
3725                 } else {
3726                         int i;
3727
3728                         lo->ll_status = LS_SCANNING_PHASE1;
3729                         lo->ll_run_time_phase1 = 0;
3730                         lo->ll_run_time_phase2 = 0;
3731                         lo->ll_objs_checked_phase1 = 0;
3732                         lo->ll_objs_checked_phase2 = 0;
3733                         lo->ll_objs_failed_phase1 = 0;
3734                         lo->ll_objs_failed_phase2 = 0;
3735                         for (i = 0; i < LLIT_MAX; i++)
3736                                 lo->ll_objs_repaired[i] = 0;
3737
3738                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
3739                         fid_zero(&com->lc_fid_latest_scanned_phase2);
3740                 }
3741         } else {
3742                 lo->ll_status = LS_SCANNING_PHASE1;
3743                 if (!lfsck->li_drop_dryrun ||
3744                     lo->ll_pos_first_inconsistent == 0)
3745                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
3746                 else
3747                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
3748         }
3749         spin_unlock(&lfsck->li_lock);
3750         up_write(&com->lc_sem);
3751
3752         return 0;
3753 }
3754
3755 static int lfsck_layout_slave_prep(const struct lu_env *env,
3756                                    struct lfsck_component *com,
3757                                    struct lfsck_start_param *lsp)
3758 {
3759         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
3760         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3761         struct lfsck_layout             *lo     = com->lc_file_ram;
3762         struct lfsck_start              *start  = lsp->lsp_start;
3763         int                              rc;
3764
3765         rc = lfsck_layout_prep(env, com, start);
3766         if (rc != 0)
3767                 return rc;
3768
3769         if (lo->ll_flags & LF_CRASHED_LASTID &&
3770             list_empty(&llsd->llsd_master_list)) {
3771                 LASSERT(lfsck->li_out_notify != NULL);
3772
3773                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
3774                                      LE_LASTID_REBUILDING);
3775         }
3776
3777         if (!lsp->lsp_index_valid)
3778                 return 0;
3779
3780         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
3781         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
3782                 LASSERT(!llsd->llsd_rbtree_valid);
3783
3784                 write_lock(&llsd->llsd_rb_lock);
3785                 rc = lfsck_rbtree_setup(env, com);
3786                 write_unlock(&llsd->llsd_rb_lock);
3787         }
3788
3789         CDEBUG(D_LFSCK, "%s: layout LFSCK slave prep done, start pos ["
3790                LPU64"]\n", lfsck_lfsck2name(lfsck),
3791                com->lc_pos_start.lp_oit_cookie);
3792
3793         return rc;
3794 }
3795
3796 static int lfsck_layout_master_prep(const struct lu_env *env,
3797                                     struct lfsck_component *com,
3798                                     struct lfsck_start_param *lsp)
3799 {
3800         int rc;
3801         ENTRY;
3802
3803         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
3804         if (rc != 0)
3805                 RETURN(rc);
3806
3807         rc = lfsck_start_assistant(env, com, lsp);
3808
3809         CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
3810                LPU64"\n", lfsck_lfsck2name(com->lc_lfsck),
3811                com->lc_pos_start.lp_oit_cookie);
3812
3813         RETURN(rc);
3814 }
3815
3816 /* Pre-fetch the attribute for each stripe in the given layout EA. */
3817 static int lfsck_layout_scan_stripes(const struct lu_env *env,
3818                                      struct lfsck_component *com,
3819                                      struct dt_object *parent,
3820                                      struct lov_mds_md_v1 *lmm)
3821 {
3822         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3823         struct lfsck_instance           *lfsck   = com->lc_lfsck;
3824         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3825         struct lfsck_layout             *lo      = com->lc_file_ram;
3826         struct lfsck_assistant_data     *lad     = com->lc_data;
3827         struct lfsck_layout_object      *llo     = NULL;
3828         struct lov_ost_data_v1          *objs;
3829         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
3830         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3831         struct ptlrpc_thread            *athread = &lad->lad_thread;
3832         struct l_wait_info               lwi     = { 0 };
3833         struct lu_buf                    buf;
3834         int                              rc      = 0;
3835         int                              i;
3836         __u32                            magic;
3837         __u16                            count;
3838         __u16                            gen;
3839         ENTRY;
3840
3841         lfsck_buf_init(&buf, &info->lti_old_pfid,
3842                        sizeof(struct filter_fid_old));
3843         count = le16_to_cpu(lmm->lmm_stripe_count);
3844         gen = le16_to_cpu(lmm->lmm_layout_gen);
3845         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3846          * been verified in lfsck_layout_verify_header() already. If some
3847          * new magic introduced in the future, then layout LFSCK needs to
3848          * be updated also. */
3849         magic = le32_to_cpu(lmm->lmm_magic);
3850         if (magic == LOV_MAGIC_V1) {
3851                 objs = &lmm->lmm_objects[0];
3852         } else {
3853                 LASSERT(magic == LOV_MAGIC_V3);
3854                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3855         }
3856
3857         for (i = 0; i < count; i++, objs++) {
3858                 struct lu_fid           *fid    = &info->lti_fid;
3859                 struct ost_id           *oi     = &info->lti_oi;
3860                 struct lfsck_layout_req *llr;
3861                 struct lfsck_tgt_desc   *tgt    = NULL;
3862                 struct dt_object        *cobj   = NULL;
3863                 __u32                    index;
3864                 bool                     wakeup = false;
3865
3866                 if (unlikely(lovea_slot_is_dummy(objs)))
3867                         continue;
3868
3869                 l_wait_event(mthread->t_ctl_waitq,
3870                              bk->lb_async_windows == 0 ||
3871                              lad->lad_prefetched < bk->lb_async_windows ||
3872                              !thread_is_running(mthread) ||
3873                              thread_is_stopped(athread),
3874                              &lwi);
3875
3876                 if (unlikely(!thread_is_running(mthread)) ||
3877                              thread_is_stopped(athread))
3878                         GOTO(out, rc = 0);
3879
3880                 if (unlikely(lfsck_is_dead_obj(parent)))
3881                         GOTO(out, rc = 0);
3882
3883                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3884                 index = le32_to_cpu(objs->l_ost_idx);
3885                 rc = ostid_to_fid(fid, oi, index);
3886                 if (rc != 0) {
3887                         CDEBUG(D_LFSCK, "%s: get invalid layout EA for "DFID
3888                                ": "DOSTID", idx:%u\n", lfsck_lfsck2name(lfsck),
3889                                PFID(lfsck_dto2fid(parent)), POSTID(oi), index);
3890                         goto next;
3891                 }
3892
3893                 tgt = lfsck_tgt_get(ltds, index);
3894                 if (unlikely(tgt == NULL)) {
3895                         CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
3896                                "did not join the layout LFSCK\n",
3897                                lfsck_lfsck2name(lfsck), index);
3898                         lo->ll_flags |= LF_INCOMPLETE;
3899                         goto next;
3900                 }
3901
3902                 /* There is potential deadlock race condition between object
3903                  * destroy and layout LFSCK. Consider the following scenario:
3904                  *
3905                  * 1) The LFSCK thread obtained the parent object firstly, at
3906                  *    that time, the parent object has not been destroyed yet.
3907                  *
3908                  * 2) One RPC service thread destroyed the parent and all its
3909                  *    children objects. Because the LFSCK is referencing the
3910                  *    parent object, then the parent object will be marked as
3911                  *    dying in RAM. On the other hand, the parent object is
3912                  *    referencing all its children objects, then all children
3913                  *    objects will be marked as dying in RAM also.
3914                  *
3915                  * 3) The LFSCK thread tries to find some child object with
3916                  *    the parent object referenced. Then it will find that the
3917                  *    child object is dying. According to the object visibility
3918                  *    rules: the object with dying flag cannot be returned to
3919                  *    others. So the LFSCK thread has to wait until the dying
3920                  *    object has been purged from RAM, then it can allocate a
3921                  *    new object (with the same FID) in RAM. Unfortunately, the
3922                  *    LFSCK thread itself is referencing the parent object, and
3923                  *    cause the parent object cannot be purged, then cause the
3924                  *    child object cannot be purged also. So the LFSCK thread
3925                  *    will fall into deadlock.
3926                  *
3927                  * We introduce non-blocked version lu_object_find() to allow
3928                  * the LFSCK thread to return failure immediately (instead of
3929                  * wait) when it finds dying (child) object, then the LFSCK
3930                  * thread can check whether the parent object is dying or not.
3931                  * So avoid above deadlock. LU-5395 */
3932                 cobj = lfsck_object_find_by_dev_nowait(env, tgt->ltd_tgt, fid);
3933                 if (IS_ERR(cobj)) {
3934                         if (lfsck_is_dead_obj(parent)) {
3935                                 lfsck_tgt_put(tgt);
3936
3937                                 GOTO(out, rc = 0);
3938                         }
3939
3940                         rc = PTR_ERR(cobj);
3941                         goto next;
3942                 }
3943
3944                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
3945                 if (rc != 0)
3946                         goto next;
3947
3948                 rc = dt_declare_xattr_get(env, cobj, &buf, XATTR_NAME_FID,
3949                                           BYPASS_CAPA);
3950                 if (rc != 0)
3951                         goto next;
3952
3953                 if (llo == NULL) {
3954                         llo = lfsck_layout_object_init(env, parent,
3955                                 lfsck->li_pos_current.lp_oit_cookie, gen);
3956                         if (IS_ERR(llo)) {
3957                                 rc = PTR_ERR(llo);
3958                                 goto next;
3959                         }
3960                 }
3961
3962                 llr = lfsck_layout_assistant_req_init(llo, cobj, index, i);
3963                 if (IS_ERR(llr)) {
3964                         rc = PTR_ERR(llr);
3965                         goto next;
3966                 }
3967
3968                 cobj = NULL;
3969                 spin_lock(&lad->lad_lock);
3970                 if (lad->lad_assistant_status < 0) {
3971                         spin_unlock(&lad->lad_lock);
3972                         lfsck_layout_assistant_req_fini(env, &llr->llr_lar);
3973                         lfsck_tgt_put(tgt);
3974                         RETURN(lad->lad_assistant_status);
3975                 }
3976
3977                 list_add_tail(&llr->llr_lar.lar_list, &lad->lad_req_list);
3978                 if (lad->lad_prefetched == 0)
3979                         wakeup = true;
3980
3981                 lad->lad_prefetched++;
3982                 spin_unlock(&lad->lad_lock);
3983                 if (wakeup)
3984                         wake_up_all(&athread->t_ctl_waitq);
3985
3986 next:
3987                 down_write(&com->lc_sem);
3988                 com->lc_new_checked++;
3989                 if (rc < 0)
3990                         lfsck_layout_record_failure(env, lfsck, lo);
3991                 up_write(&com->lc_sem);
3992
3993                 if (cobj != NULL && !IS_ERR(cobj))
3994                         lu_object_put(env, &cobj->do_lu);
3995
3996                 if (likely(tgt != NULL))
3997                         lfsck_tgt_put(tgt);
3998
3999                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4000                         GOTO(out, rc);
4001         }
4002
4003         GOTO(out, rc = 0);
4004
4005 out:
4006         if (llo != NULL && !IS_ERR(llo))
4007                 lfsck_layout_object_put(env, llo);
4008
4009         return rc;
4010 }
4011
4012 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4013  * the OST-object's attribute and generate an structure lfsck_layout_req on the
4014  * list ::lad_req_list.
4015  *
4016  * For each request on above list, the lfsck_layout_assistant thread compares
4017  * the OST side attribute with local attribute, if inconsistent, then repair it.
4018  *
4019  * All above processing is async mode with pipeline. */
4020 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4021                                         struct lfsck_component *com,
4022                                         struct dt_object *obj)
4023 {
4024         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4025         struct ost_id                   *oi     = &info->lti_oi;
4026         struct lfsck_layout             *lo     = com->lc_file_ram;
4027         struct lfsck_assistant_data     *lad    = com->lc_data;
4028         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4029         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4030         struct thandle                  *handle = NULL;
4031         struct lu_buf                   *buf    = &info->lti_big_buf;
4032         struct lov_mds_md_v1            *lmm    = NULL;
4033         struct dt_device                *dev    = lfsck->li_bottom;
4034         struct lustre_handle             lh     = { 0 };
4035         struct lu_buf                    ea_buf = { 0 };
4036         int                              rc     = 0;
4037         int                              size   = 0;
4038         bool                             locked = false;
4039         bool                             stripe = false;
4040         bool                             bad_oi = false;
4041         ENTRY;
4042
4043         if (!S_ISREG(lfsck_object_type(obj)))
4044                 GOTO(out, rc = 0);
4045
4046         if (lad->lad_assistant_status < 0)
4047                 GOTO(out, rc = -ESRCH);
4048
4049         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4050         lmm_oi_cpu_to_le(oi, oi);
4051         dt_read_lock(env, obj, 0);
4052         locked = true;
4053
4054 again:
4055         if (dt_object_exists(obj) == 0 ||
4056             lfsck_is_dead_obj(obj))
4057                 GOTO(out, rc = 0);
4058
4059         rc = lfsck_layout_get_lovea(env, obj, buf);
4060         if (rc <= 0)
4061                 GOTO(out, rc);
4062
4063         size = rc;
4064         lmm = buf->lb_buf;
4065         rc = lfsck_layout_verify_header(lmm);
4066         /* If the LOV EA crashed, then it is possible to be rebuilt later
4067          * when handle orphan OST-objects. */
4068         if (rc != 0)
4069                 GOTO(out, rc);
4070
4071         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4072                 GOTO(out, stripe = true);
4073
4074         /* Inconsistent lmm_oi, should be repaired. */
4075         bad_oi = true;
4076         lmm->lmm_oi = *oi;
4077
4078         if (bk->lb_param & LPF_DRYRUN) {
4079                 down_write(&com->lc_sem);
4080                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4081                 up_write(&com->lc_sem);
4082
4083                 GOTO(out, stripe = true);
4084         }
4085
4086         if (!lustre_handle_is_used(&lh)) {
4087                 dt_read_unlock(env, obj);
4088                 locked = false;
4089                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
4090                                       MDS_INODELOCK_LAYOUT |
4091                                       MDS_INODELOCK_XATTR, LCK_EX);
4092                 if (rc != 0)
4093                         GOTO(out, rc);
4094
4095                 handle = dt_trans_create(env, dev);
4096                 if (IS_ERR(handle))
4097                         GOTO(out, rc = PTR_ERR(handle));
4098
4099                 lfsck_buf_init(&ea_buf, lmm, size);
4100                 rc = dt_declare_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4101                                           LU_XATTR_REPLACE, handle);
4102                 if (rc != 0)
4103                         GOTO(out, rc);
4104
4105                 rc = dt_trans_start_local(env, dev, handle);
4106                 if (rc != 0)
4107                         GOTO(out, rc);
4108
4109                 dt_write_lock(env, obj, 0);
4110                 locked = true;
4111
4112                 goto again;
4113         }
4114
4115         rc = dt_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4116                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4117         if (rc != 0)
4118                 GOTO(out, rc);
4119
4120         down_write(&com->lc_sem);
4121         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4122         up_write(&com->lc_sem);
4123
4124         GOTO(out, stripe = true);
4125
4126 out:
4127         if (locked) {
4128                 if (lustre_handle_is_used(&lh))
4129                         dt_write_unlock(env, obj);
4130                 else
4131                         dt_read_unlock(env, obj);
4132         }
4133
4134         if (handle != NULL && !IS_ERR(handle))
4135                 dt_trans_stop(env, dev, handle);
4136
4137         lfsck_ibits_unlock(&lh, LCK_EX);
4138
4139         if (bad_oi)
4140                 CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
4141                        DFID": rc = %d\n", lfsck_lfsck2name(lfsck),
4142                        bk->lb_param & LPF_DRYRUN ? "found" : "repaired",
4143                        PFID(lfsck_dto2fid(obj)), rc);
4144
4145         if (stripe) {
4146                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4147         } else {
4148                 down_write(&com->lc_sem);
4149                 com->lc_new_checked++;
4150                 if (rc < 0)
4151                         lfsck_layout_record_failure(env, lfsck, lo);
4152                 up_write(&com->lc_sem);
4153         }
4154
4155         return rc;
4156 }
4157
4158 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4159                                        struct lfsck_component *com,
4160                                        struct dt_object *obj)
4161 {
4162         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4163         struct lfsck_layout             *lo     = com->lc_file_ram;
4164         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
4165         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4166         struct lfsck_layout_seq         *lls;
4167         __u64                            seq;
4168         __u64                            oid;
4169         int                              rc;
4170         ENTRY;
4171
4172         LASSERT(llsd != NULL);
4173
4174         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
4175             cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
4176                 struct l_wait_info       lwi = LWI_TIMEOUT(cfs_time_seconds(1),
4177                                                            NULL, NULL);
4178                 struct ptlrpc_thread    *thread = &lfsck->li_thread;
4179
4180                 l_wait_event(thread->t_ctl_waitq,
4181                              !thread_is_running(thread),
4182                              &lwi);
4183         }
4184
4185         lfsck_rbtree_update_bitmap(env, com, fid, false);
4186
4187         down_write(&com->lc_sem);
4188         if (fid_is_idif(fid))
4189                 seq = 0;
4190         else if (!fid_is_norm(fid) ||
4191                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4192                 GOTO(unlock, rc = 0);
4193         else
4194                 seq = fid_seq(fid);
4195         com->lc_new_checked++;
4196
4197         lls = lfsck_layout_seq_lookup(llsd, seq);
4198         if (lls == NULL) {
4199                 OBD_ALLOC_PTR(lls);
4200                 if (unlikely(lls == NULL))
4201                         GOTO(unlock, rc = -ENOMEM);
4202
4203                 INIT_LIST_HEAD(&lls->lls_list);
4204                 lls->lls_seq = seq;
4205                 rc = lfsck_layout_lastid_load(env, com, lls);
4206                 if (rc != 0) {
4207                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4208                               "load LAST_ID for "LPX64": rc = %d\n",
4209                               lfsck_lfsck2name(com->lc_lfsck), seq, rc);
4210                         lo->ll_objs_failed_phase1++;
4211                         OBD_FREE_PTR(lls);
4212                         GOTO(unlock, rc);
4213                 }
4214
4215                 lfsck_layout_seq_insert(llsd, lls);
4216         }
4217
4218         if (unlikely(fid_is_last_id(fid)))
4219                 GOTO(unlock, rc = 0);
4220
4221         if (fid_is_idif(fid))
4222                 oid = fid_idif_id(fid_seq(fid), fid_oid(fid), fid_ver(fid));
4223         else
4224                 oid = fid_oid(fid);
4225
4226         if (oid > lls->lls_lastid_known)
4227                 lls->lls_lastid_known = oid;
4228
4229         if (oid > lls->lls_lastid) {
4230                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4231                         /* OFD may create new objects during LFSCK scanning. */
4232                         rc = lfsck_layout_lastid_reload(env, com, lls);
4233                         if (unlikely(rc != 0)) {
4234                                 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4235                                       "reload LAST_ID for "LPX64": rc = %d\n",
4236                                       lfsck_lfsck2name(com->lc_lfsck),
4237                                       lls->lls_seq, rc);
4238
4239                                 GOTO(unlock, rc);
4240                         }
4241
4242                         if (oid <= lls->lls_lastid ||
4243                             lo->ll_flags & LF_CRASHED_LASTID)
4244                                 GOTO(unlock, rc = 0);
4245
4246                         LASSERT(lfsck->li_out_notify != NULL);
4247
4248                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4249                                              LE_LASTID_REBUILDING);
4250                         lo->ll_flags |= LF_CRASHED_LASTID;
4251
4252                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
4253                                "LAST_ID file (2) for the sequence "LPX64
4254                                ", old value "LPU64", known value "LPU64"\n",
4255                                lfsck_lfsck2name(lfsck), lls->lls_seq,
4256                                lls->lls_lastid, oid);
4257                 }
4258
4259                 lls->lls_lastid = oid;
4260                 lls->lls_dirty = 1;
4261         }
4262
4263         GOTO(unlock, rc = 0);
4264
4265 unlock:
4266         up_write(&com->lc_sem);
4267
4268         return rc;
4269 }
4270
4271 static int lfsck_layout_exec_dir(const struct lu_env *env,
4272                                  struct lfsck_component *com,
4273                                  struct lu_dirent *ent, __u16 type)
4274 {
4275         return 0;
4276 }
4277
4278 static int lfsck_layout_master_post(const struct lu_env *env,
4279                                     struct lfsck_component *com,
4280                                     int result, bool init)
4281 {
4282         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4283         struct lfsck_layout     *lo     = com->lc_file_ram;
4284         int                      rc;
4285         ENTRY;
4286
4287         lfsck_post_generic(env, com, &result);
4288
4289         down_write(&com->lc_sem);
4290         spin_lock(&lfsck->li_lock);
4291         if (!init)
4292                 lo->ll_pos_last_checkpoint =
4293                                 lfsck->li_pos_checkpoint.lp_oit_cookie;
4294
4295         if (result > 0) {
4296                 lo->ll_status = LS_SCANNING_PHASE2;
4297                 lo->ll_flags |= LF_SCANNED_ONCE;
4298                 lo->ll_flags &= ~LF_UPGRADE;
4299                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4300         } else if (result == 0) {
4301                 lo->ll_status = lfsck->li_status;
4302                 if (lo->ll_status == 0)
4303                         lo->ll_status = LS_STOPPED;
4304                 if (lo->ll_status != LS_PAUSED) {
4305                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4306                 }
4307         } else {
4308                 lo->ll_status = LS_FAILED;
4309                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4310         }
4311         spin_unlock(&lfsck->li_lock);
4312
4313         if (!init) {
4314                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4315                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4316                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4317                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4318                 com->lc_new_checked = 0;
4319         }
4320
4321         rc = lfsck_layout_store(env, com);
4322         up_write(&com->lc_sem);
4323
4324         CDEBUG(D_LFSCK, "%s: layout LFSCK master post done: rc = %d\n",
4325                lfsck_lfsck2name(lfsck), rc);
4326
4327         RETURN(rc);
4328 }
4329
4330 static int lfsck_layout_slave_post(const struct lu_env *env,
4331                                    struct lfsck_component *com,
4332                                    int result, bool init)
4333 {
4334         struct lfsck_instance   *lfsck = com->lc_lfsck;
4335         struct lfsck_layout     *lo    = com->lc_file_ram;
4336         int                      rc;
4337         bool                     done  = false;
4338
4339         rc = lfsck_layout_lastid_store(env, com);
4340         if (rc != 0)
4341                 result = rc;
4342
4343         LASSERT(lfsck->li_out_notify != NULL);
4344
4345         down_write(&com->lc_sem);
4346         spin_lock(&lfsck->li_lock);
4347         if (!init)
4348                 lo->ll_pos_last_checkpoint =
4349                                 lfsck->li_pos_checkpoint.lp_oit_cookie;
4350
4351         if (result > 0) {
4352                 lo->ll_status = LS_SCANNING_PHASE2;
4353                 lo->ll_flags |= LF_SCANNED_ONCE;
4354                 if (lo->ll_flags & LF_CRASHED_LASTID) {
4355                         done = true;
4356                         lo->ll_flags &= ~LF_CRASHED_LASTID;
4357
4358                         CDEBUG(D_LFSCK, "%s: layout LFSCK has rebuilt "
4359                                "crashed LAST_ID files successfully\n",
4360                                lfsck_lfsck2name(lfsck));
4361                 }
4362                 lo->ll_flags &= ~LF_UPGRADE;
4363                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
4364         } else if (result == 0) {
4365                 lo->ll_status = lfsck->li_status;
4366                 if (lo->ll_status == 0)
4367                         lo->ll_status = LS_STOPPED;
4368                 if (lo->ll_status != LS_PAUSED)
4369                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4370         } else {
4371                 lo->ll_status = LS_FAILED;
4372                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
4373         }
4374         spin_unlock(&lfsck->li_lock);
4375
4376         if (done)
4377                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4378                                      LE_LASTID_REBUILT);
4379
4380         if (!init) {
4381                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4382                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4383                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4384                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4385                 com->lc_new_checked = 0;
4386         }
4387
4388         rc = lfsck_layout_store(env, com);
4389         up_write(&com->lc_sem);
4390
4391         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
4392
4393         if (result <= 0)
4394                 lfsck_rbtree_cleanup(env, com);
4395
4396         CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
4397                lfsck_lfsck2name(lfsck), rc);
4398
4399         return rc;
4400 }
4401
4402 static int lfsck_layout_dump(const struct lu_env *env,
4403                              struct lfsck_component *com, struct seq_file *m)
4404 {
4405         struct lfsck_instance   *lfsck = com->lc_lfsck;
4406         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
4407         struct lfsck_layout     *lo    = com->lc_file_ram;
4408         int                      rc;
4409
4410         down_read(&com->lc_sem);
4411         seq_printf(m, "name: lfsck_layout\n"
4412                       "magic: %#x\n"
4413                       "version: %d\n"
4414                       "status: %s\n",
4415                       lo->ll_magic,
4416                       bk->lb_version,
4417                       lfsck_status2names(lo->ll_status));
4418
4419         rc = lfsck_bits_dump(m, lo->ll_flags, lfsck_flags_names, "flags");
4420         if (rc < 0)
4421                 goto out;
4422
4423         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
4424         if (rc < 0)
4425                 goto out;
4426
4427         rc = lfsck_time_dump(m, lo->ll_time_last_complete,
4428                              "time_since_last_completed");
4429         if (rc < 0)
4430                 goto out;
4431
4432         rc = lfsck_time_dump(m, lo->ll_time_latest_start,
4433                              "time_since_latest_start");
4434         if (rc < 0)
4435                 goto out;
4436
4437         rc = lfsck_time_dump(m, lo->ll_time_last_checkpoint,
4438                              "time_since_last_checkpoint");
4439         if (rc < 0)
4440                 goto out;
4441
4442         seq_printf(m, "latest_start_position: "LPU64"\n"
4443                       "last_checkpoint_position: "LPU64"\n"
4444                       "first_failure_position: "LPU64"\n",
4445                       lo->ll_pos_latest_start,
4446                       lo->ll_pos_last_checkpoint,
4447                       lo->ll_pos_first_inconsistent);
4448
4449         seq_printf(m, "success_count: %u\n"
4450                       "repaired_dangling: "LPU64"\n"
4451                       "repaired_unmatched_pair: "LPU64"\n"
4452                       "repaired_multiple_referenced: "LPU64"\n"
4453                       "repaired_orphan: "LPU64"\n"
4454                       "repaired_inconsistent_owner: "LPU64"\n"
4455                       "repaired_others: "LPU64"\n"
4456                       "skipped: "LPU64"\n"
4457                       "failed_phase1: "LPU64"\n"
4458                       "failed_phase2: "LPU64"\n",
4459                       lo->ll_success_count,
4460                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
4461                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
4462                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
4463                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
4464                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
4465                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
4466                       lo->ll_objs_skipped,
4467                       lo->ll_objs_failed_phase1,
4468                       lo->ll_objs_failed_phase2);
4469
4470         if (lo->ll_status == LS_SCANNING_PHASE1) {
4471                 __u64 pos;
4472                 const struct dt_it_ops *iops;
4473                 cfs_duration_t duration = cfs_time_current() -
4474                                           lfsck->li_time_last_checkpoint;
4475                 __u64 checked = lo->ll_objs_checked_phase1 +
4476                                 com->lc_new_checked;
4477                 __u64 speed = checked;
4478                 __u64 new_checked = com->lc_new_checked * HZ;
4479                 __u32 rtime = lo->ll_run_time_phase1 +
4480                               cfs_duration_sec(duration + HALF_SEC);
4481
4482                 if (duration != 0)
4483                         do_div(new_checked, duration);
4484                 if (rtime != 0)
4485                         do_div(speed, rtime);
4486                 seq_printf(m, "checked_phase1: "LPU64"\n"
4487                               "checked_phase2: "LPU64"\n"
4488                               "run_time_phase1: %u seconds\n"
4489                               "run_time_phase2: %u seconds\n"
4490                               "average_speed_phase1: "LPU64" items/sec\n"
4491                               "average_speed_phase2: N/A\n"
4492                               "real-time_speed_phase1: "LPU64" items/sec\n"
4493                               "real-time_speed_phase2: N/A\n",
4494                               checked,
4495                               lo->ll_objs_checked_phase2,
4496                               rtime,
4497                               lo->ll_run_time_phase2,
4498                               speed,
4499                               new_checked);
4500
4501                 LASSERT(lfsck->li_di_oit != NULL);
4502
4503                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4504
4505                 /* The low layer otable-based iteration position may NOT
4506                  * exactly match the layout-based directory traversal
4507                  * cookie. Generally, it is not a serious issue. But the
4508                  * caller should NOT make assumption on that. */
4509                 pos = iops->store(env, lfsck->li_di_oit);
4510                 if (!lfsck->li_current_oit_processed)
4511                         pos--;
4512                 seq_printf(m, "current_position: "LPU64"\n", pos);
4513
4514         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
4515                 cfs_duration_t duration = cfs_time_current() -
4516                                           lfsck->li_time_last_checkpoint;
4517                 __u64 checked = lo->ll_objs_checked_phase2 +
4518                                 com->lc_new_checked;
4519                 __u64 speed1 = lo->ll_objs_checked_phase1;
4520                 __u64 speed2 = checked;
4521                 __u64 new_checked = com->lc_new_checked * HZ;
4522                 __u32 rtime = lo->ll_run_time_phase2 +
4523                               cfs_duration_sec(duration + HALF_SEC);
4524
4525                 if (duration != 0)
4526                         do_div(new_checked, duration);
4527                 if (lo->ll_run_time_phase1 != 0)
4528                         do_div(speed1, lo->ll_run_time_phase1);
4529                 if (rtime != 0)
4530                         do_div(speed2, rtime);
4531                 rc = seq_printf(m, "checked_phase1: "LPU64"\n"
4532                                 "checked_phase2: "LPU64"\n"
4533                                 "run_time_phase1: %u seconds\n"
4534                                 "run_time_phase2: %u seconds\n"
4535                                 "average_speed_phase1: "LPU64" items/sec\n"
4536                                 "average_speed_phase2: "LPU64" items/sec\n"
4537                                 "real-time_speed_phase1: N/A\n"
4538                                 "real-time_speed_phase2: "LPU64" items/sec\n"
4539                                 "current_position: "DFID"\n",
4540                                 lo->ll_objs_checked_phase1,
4541                                 checked,
4542                                 lo->ll_run_time_phase1,
4543                                 rtime,
4544                                 speed1,
4545                                 speed2,
4546                                 new_checked,
4547                                 PFID(&com->lc_fid_latest_scanned_phase2));
4548                 if (rc <= 0)
4549                         goto out;
4550
4551         } else {
4552                 __u64 speed1 = lo->ll_objs_checked_phase1;
4553                 __u64 speed2 = lo->ll_objs_checked_phase2;
4554
4555                 if (lo->ll_run_time_phase1 != 0)
4556                         do_div(speed1, lo->ll_run_time_phase1);
4557                 if (lo->ll_run_time_phase2 != 0)
4558                         do_div(speed2, lo->ll_run_time_phase2);
4559                 seq_printf(m, "checked_phase1: "LPU64"\n"
4560                            "checked_phase2: "LPU64"\n"
4561                            "run_time_phase1: %u seconds\n"
4562                            "run_time_phase2: %u seconds\n"
4563                            "average_speed_phase1: "LPU64" items/sec\n"
4564                            "average_speed_phase2: "LPU64" objs/sec\n"
4565                            "real-time_speed_phase1: N/A\n"
4566                            "real-time_speed_phase2: N/A\n"
4567                            "current_position: N/A\n",
4568                            lo->ll_objs_checked_phase1,
4569                            lo->ll_objs_checked_phase2,
4570                            lo->ll_run_time_phase1,
4571                            lo->ll_run_time_phase2,
4572                            speed1,
4573                            speed2);
4574         }
4575 out:
4576         up_read(&com->lc_sem);
4577
4578         return rc;
4579 }
4580
4581 static int lfsck_layout_master_double_scan(const struct lu_env *env,
4582                                            struct lfsck_component *com)
4583 {
4584         struct lfsck_layout *lo = com->lc_file_ram;
4585
4586         return lfsck_double_scan_generic(env, com, lo->ll_status);
4587 }
4588
4589 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
4590                                           struct lfsck_component *com)
4591 {
4592         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4593         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4594         struct lfsck_layout             *lo     = com->lc_file_ram;
4595         struct ptlrpc_thread            *thread = &lfsck->li_thread;
4596         int                              rc;
4597         ENTRY;
4598
4599         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
4600                 lfsck_rbtree_cleanup(env, com);
4601                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
4602                 RETURN(0);
4603         }
4604
4605         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
4606                lfsck_lfsck2name(lfsck));
4607
4608         atomic_inc(&lfsck->li_double_scan_count);
4609
4610         com->lc_new_checked = 0;
4611         com->lc_new_scanned = 0;
4612         com->lc_time_last_checkpoint = cfs_time_current();
4613         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
4614                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4615
4616         while (1) {
4617                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
4618                                                      NULL, NULL);
4619
4620                 rc = lfsck_layout_slave_query_master(env, com);
4621                 if (list_empty(&llsd->llsd_master_list)) {
4622                         if (unlikely(!thread_is_running(thread)))
4623                                 rc = 0;
4624                         else
4625                                 rc = 1;
4626
4627                         GOTO(done, rc);
4628                 }
4629
4630                 if (rc < 0)
4631                         GOTO(done, rc);
4632
4633                 rc = l_wait_event(thread->t_ctl_waitq,
4634                                   !thread_is_running(thread) ||
4635                                   list_empty(&llsd->llsd_master_list),
4636                                   &lwi);
4637                 if (unlikely(!thread_is_running(thread)))
4638                         GOTO(done, rc = 0);
4639
4640                 if (rc == -ETIMEDOUT)
4641                         continue;
4642
4643                 GOTO(done, rc = (rc < 0 ? rc : 1));
4644         }
4645
4646 done:
4647         rc = lfsck_layout_double_scan_result(env, com, rc);
4648
4649         lfsck_rbtree_cleanup(env, com);
4650         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
4651         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
4652                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
4653
4654         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan finished, "
4655                "status %d: rc = %d\n",
4656                lfsck_lfsck2name(lfsck), lo->ll_status, rc);
4657
4658         return rc;
4659 }
4660
4661 static void lfsck_layout_master_data_release(const struct lu_env *env,
4662                                              struct lfsck_component *com)
4663 {
4664         struct lfsck_assistant_data     *lad    = com->lc_data;
4665         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4666         struct lfsck_tgt_descs          *ltds;
4667         struct lfsck_tgt_desc           *ltd;
4668         struct lfsck_tgt_desc           *next;
4669
4670         LASSERT(lad != NULL);
4671         LASSERT(thread_is_init(&lad->lad_thread) ||
4672                 thread_is_stopped(&lad->lad_thread));
4673         LASSERT(list_empty(&lad->lad_req_list));
4674
4675         com->lc_data = NULL;
4676
4677         ltds = &lfsck->li_ost_descs;
4678         spin_lock(&ltds->ltd_lock);
4679         list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase1_list,
4680                                  ltd_layout_phase_list) {
4681                 list_del_init(&ltd->ltd_layout_phase_list);
4682         }
4683         list_for_each_entry_safe(ltd, next, &lad->lad_ost_phase2_list,
4684                                  ltd_layout_phase_list) {
4685                 list_del_init(&ltd->ltd_layout_phase_list);
4686         }
4687         list_for_each_entry_safe(ltd, next, &lad->lad_ost_list,
4688                                  ltd_layout_list) {
4689                 list_del_init(&ltd->ltd_layout_list);
4690         }
4691         spin_unlock(&ltds->ltd_lock);
4692
4693         ltds = &lfsck->li_mdt_descs;
4694         spin_lock(&ltds->ltd_lock);
4695         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
4696                                  ltd_layout_phase_list) {
4697                 list_del_init(&ltd->ltd_layout_phase_list);
4698         }
4699         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
4700                                  ltd_layout_phase_list) {
4701                 list_del_init(&ltd->ltd_layout_phase_list);
4702         }
4703         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
4704                                  ltd_layout_list) {
4705                 list_del_init(&ltd->ltd_layout_list);
4706         }
4707         spin_unlock(&ltds->ltd_lock);
4708
4709         OBD_FREE_PTR(lad);
4710 }
4711
4712 static void lfsck_layout_slave_data_release(const struct lu_env *env,
4713                                             struct lfsck_component *com)
4714 {
4715         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4716         struct lfsck_layout_seq          *lls;
4717         struct lfsck_layout_seq          *next;
4718         struct lfsck_layout_slave_target *llst;
4719         struct lfsck_layout_slave_target *tmp;
4720
4721         LASSERT(llsd != NULL);
4722
4723         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
4724                                      lls_list) {
4725                 list_del_init(&lls->lls_list);
4726                 lfsck_object_put(env, lls->lls_lastid_obj);
4727                 OBD_FREE_PTR(lls);
4728         }
4729
4730         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
4731                                  llst_list) {
4732                 list_del_init(&llst->llst_list);
4733                 OBD_FREE_PTR(llst);
4734         }
4735
4736         lfsck_rbtree_cleanup(env, com);
4737         com->lc_data = NULL;
4738         OBD_FREE_PTR(llsd);
4739 }
4740
4741 static void lfsck_layout_slave_quit(const struct lu_env *env,
4742                                     struct lfsck_component *com)
4743 {
4744         lfsck_rbtree_cleanup(env, com);
4745 }
4746
4747 static int lfsck_layout_master_in_notify(const struct lu_env *env,
4748                                          struct lfsck_component *com,
4749                                          struct lfsck_request *lr)
4750 {
4751         struct lfsck_instance           *lfsck = com->lc_lfsck;
4752         struct lfsck_layout             *lo    = com->lc_file_ram;
4753         struct lfsck_assistant_data     *lad   = com->lc_data;
4754         struct lfsck_tgt_descs          *ltds;
4755         struct lfsck_tgt_desc           *ltd;
4756         bool                             fail  = false;
4757         ENTRY;
4758
4759         if (lr->lr_event == LE_PAIRS_VERIFY) {
4760                 int rc;
4761
4762                 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
4763                                                      &lr->lr_fid2);
4764
4765                 RETURN(rc);
4766         }
4767
4768         CDEBUG(D_LFSCK, "%s: layout LFSCK master handles notify %u "
4769                "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
4770                lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
4771                lr->lr_index, lr->lr_status);
4772
4773         if (lr->lr_event != LE_PHASE1_DONE &&
4774             lr->lr_event != LE_PHASE2_DONE &&
4775             lr->lr_event != LE_PEER_EXIT)
4776                 RETURN(-EINVAL);
4777
4778         if (lr->lr_flags & LEF_FROM_OST)
4779                 ltds = &lfsck->li_ost_descs;
4780         else
4781                 ltds = &lfsck->li_mdt_descs;
4782         spin_lock(&ltds->ltd_lock);
4783         ltd = LTD_TGT(ltds, lr->lr_index);
4784         if (ltd == NULL) {
4785                 spin_unlock(&ltds->ltd_lock);
4786
4787                 RETURN(-ENXIO);
4788         }
4789
4790         list_del_init(&ltd->ltd_layout_phase_list);
4791         switch (lr->lr_event) {
4792         case LE_PHASE1_DONE:
4793                 if (lr->lr_status <= 0) {
4794                         ltd->ltd_layout_done = 1;
4795                         list_del_init(&ltd->ltd_layout_list);
4796                         lo->ll_flags |= LF_INCOMPLETE;
4797                         fail = true;
4798                         break;
4799                 }
4800
4801                 if (lr->lr_flags & LEF_FROM_OST) {
4802                         if (list_empty(&ltd->ltd_layout_list))
4803                                 list_add_tail(&ltd->ltd_layout_list,
4804                                               &lad->lad_ost_list);
4805                         list_add_tail(&ltd->ltd_layout_phase_list,
4806                                       &lad->lad_ost_phase2_list);
4807                 } else {
4808                         if (list_empty(&ltd->ltd_layout_list))
4809                                 list_add_tail(&ltd->ltd_layout_list,
4810                                               &lad->lad_mdt_list);
4811                         list_add_tail(&ltd->ltd_layout_phase_list,
4812                                       &lad->lad_mdt_phase2_list);
4813                 }
4814                 break;
4815         case LE_PHASE2_DONE:
4816                 ltd->ltd_layout_done = 1;
4817                 list_del_init(&ltd->ltd_layout_list);
4818                 break;
4819         case LE_PEER_EXIT:
4820                 fail = true;
4821                 ltd->ltd_layout_done = 1;
4822                 list_del_init(&ltd->ltd_layout_list);
4823                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
4824                         lo->ll_flags |= LF_INCOMPLETE;
4825                 break;
4826         default:
4827                 break;
4828         }
4829         spin_unlock(&ltds->ltd_lock);
4830
4831         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
4832                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4833
4834                 memset(stop, 0, sizeof(*stop));
4835                 stop->ls_status = lr->lr_status;
4836                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4837                 lfsck_stop(env, lfsck->li_bottom, stop);
4838         } else if (lfsck_phase2_next_ready(lad)) {
4839                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
4840         }
4841
4842         RETURN(0);
4843 }
4844
4845 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
4846                                         struct lfsck_component *com,
4847                                         struct lfsck_request *lr)
4848 {
4849         struct lfsck_instance            *lfsck = com->lc_lfsck;
4850         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4851         struct lfsck_layout_slave_target *llst;
4852         int                               rc;
4853         ENTRY;
4854
4855         switch (lr->lr_event) {
4856         case LE_FID_ACCESSED:
4857                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
4858                 RETURN(0);
4859         case LE_CONDITIONAL_DESTROY:
4860                 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
4861                 RETURN(rc);
4862         case LE_PAIRS_VERIFY: {
4863                 lr->lr_status = LPVS_INIT;
4864                 /* Firstly, if the MDT-object which is claimed via OST-object
4865                  * local stored PFID xattr recognizes the OST-object, then it
4866                  * must be that the client given PFID is wrong. */
4867                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
4868                                                     &lr->lr_fid3);
4869                 if (rc <= 0)
4870                         RETURN(0);
4871
4872                 lr->lr_status = LPVS_INCONSISTENT;
4873                 /* The OST-object local stored PFID xattr is stale. We need to
4874                  * check whether the MDT-object that is claimed via the client
4875                  * given PFID information recognizes the OST-object or not. If
4876                  * matches, then need to update the OST-object's PFID xattr. */
4877                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
4878                                                     &lr->lr_fid2);
4879                 /* For rc < 0 case:
4880                  * We are not sure whether the client given PFID information
4881                  * is correct or not, do nothing to avoid improper fixing.
4882                  *
4883                  * For rc > 0 case:
4884                  * The client given PFID information is also invalid, we can
4885                  * NOT fix the OST-object inconsistency.
4886                  */
4887                 if (rc != 0)
4888                         RETURN(rc);
4889
4890                 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
4891                 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
4892
4893                 RETURN(rc);
4894         }
4895         case LE_PHASE2_DONE:
4896         case LE_PEER_EXIT:
4897                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave handle notify %u "
4898                        "from MDT %x, status %d\n", lfsck_lfsck2name(lfsck),
4899                        lr->lr_event, lr->lr_index, lr->lr_status);
4900                 break;
4901         default:
4902                 RETURN(-EINVAL);
4903         }
4904
4905         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
4906         if (llst == NULL)
4907                 RETURN(-ENXIO);
4908
4909         lfsck_layout_llst_put(llst);
4910         if (list_empty(&llsd->llsd_master_list))
4911                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
4912
4913         if (lr->lr_event == LE_PEER_EXIT &&
4914             (lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT ||
4915              (list_empty(&llsd->llsd_master_list) &&
4916               (lr->lr_status == LS_STOPPED ||
4917                lr->lr_status == LS_CO_STOPPED)))) {
4918                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4919
4920                 memset(stop, 0, sizeof(*stop));
4921                 stop->ls_status = lr->lr_status;
4922                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4923                 lfsck_stop(env, lfsck->li_bottom, stop);
4924         }
4925
4926         RETURN(0);
4927 }
4928
4929 static int lfsck_layout_query(const struct lu_env *env,
4930                               struct lfsck_component *com)
4931 {
4932         struct lfsck_layout *lo = com->lc_file_ram;
4933
4934         return lo->ll_status;
4935 }
4936
4937 /* with lfsck::li_lock held */
4938 static int lfsck_layout_slave_join(const struct lu_env *env,
4939                                    struct lfsck_component *com,
4940                                    struct lfsck_start_param *lsp)
4941 {
4942         struct lfsck_instance            *lfsck = com->lc_lfsck;
4943         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4944         struct lfsck_layout_slave_target *llst;
4945         struct lfsck_start               *start = lsp->lsp_start;
4946         int                               rc    = 0;
4947         ENTRY;
4948
4949         if (start == NULL || !(start->ls_flags & LPF_ORPHAN))
4950                 RETURN(0);
4951
4952         if (!lsp->lsp_index_valid)
4953                 RETURN(-EINVAL);
4954
4955         /* If someone is running the LFSCK without orphan handling,
4956          * it will not maintain the object accessing rbtree. So we
4957          * cannot join it for orphan handling. */
4958         if (!llsd->llsd_rbtree_valid)
4959                 RETURN(-EBUSY);
4960
4961         spin_unlock(&lfsck->li_lock);
4962         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4963         spin_lock(&lfsck->li_lock);
4964         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
4965                 spin_unlock(&lfsck->li_lock);
4966                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
4967                                                       true);
4968                 if (llst != NULL)
4969                         lfsck_layout_llst_put(llst);
4970                 spin_lock(&lfsck->li_lock);
4971                 rc = -EAGAIN;
4972         }
4973
4974         RETURN(rc);
4975 }
4976
4977 static struct lfsck_operations lfsck_layout_master_ops = {
4978         .lfsck_reset            = lfsck_layout_reset,
4979         .lfsck_fail             = lfsck_layout_fail,
4980         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
4981         .lfsck_prep             = lfsck_layout_master_prep,
4982         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
4983         .lfsck_exec_dir         = lfsck_layout_exec_dir,
4984         .lfsck_post             = lfsck_layout_master_post,
4985         .lfsck_dump             = lfsck_layout_dump,
4986         .lfsck_double_scan      = lfsck_layout_master_double_scan,
4987         .lfsck_data_release     = lfsck_layout_master_data_release,
4988         .lfsck_quit             = lfsck_quit_generic,
4989         .lfsck_in_notify        = lfsck_layout_master_in_notify,
4990         .lfsck_query            = lfsck_layout_query,
4991 };
4992
4993 static struct lfsck_operations lfsck_layout_slave_ops = {
4994         .lfsck_reset            = lfsck_layout_reset,
4995         .lfsck_fail             = lfsck_layout_fail,
4996         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
4997         .lfsck_prep             = lfsck_layout_slave_prep,
4998         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
4999         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5000         .lfsck_post             = lfsck_layout_slave_post,
5001         .lfsck_dump             = lfsck_layout_dump,
5002         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
5003         .lfsck_data_release     = lfsck_layout_slave_data_release,
5004         .lfsck_quit             = lfsck_layout_slave_quit,
5005         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
5006         .lfsck_query            = lfsck_layout_query,
5007         .lfsck_join             = lfsck_layout_slave_join,
5008 };
5009
5010 static void lfsck_layout_assistant_fill_pos(const struct lu_env *env,
5011                                             struct lfsck_component *com,
5012                                             struct lfsck_position *pos)
5013 {
5014         struct lfsck_assistant_data     *lad = com->lc_data;
5015         struct lfsck_layout_req         *llr;
5016
5017         if (list_empty(&lad->lad_req_list))
5018                 return;
5019
5020         llr = list_entry(lad->lad_req_list.next,
5021                          struct lfsck_layout_req,
5022                          llr_lar.lar_list);
5023         pos->lp_oit_cookie = llr->llr_parent->llo_cookie - 1;
5024 }
5025
5026 struct lfsck_assistant_operations lfsck_layout_assistant_ops = {
5027         .la_handler_p1          = lfsck_layout_assistant_handler_p1,
5028         .la_handler_p2          = lfsck_layout_assistant_handler_p2,
5029         .la_fill_pos            = lfsck_layout_assistant_fill_pos,
5030         .la_double_scan_result  = lfsck_layout_double_scan_result,
5031         .la_req_fini            = lfsck_layout_assistant_req_fini,
5032 };
5033
5034 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5035 {
5036         struct lfsck_component  *com;
5037         struct lfsck_layout     *lo;
5038         struct dt_object        *root = NULL;
5039         struct dt_object        *obj;
5040         int                      rc;
5041         ENTRY;
5042
5043         OBD_ALLOC_PTR(com);
5044         if (com == NULL)
5045                 RETURN(-ENOMEM);
5046
5047         INIT_LIST_HEAD(&com->lc_link);
5048         INIT_LIST_HEAD(&com->lc_link_dir);
5049         init_rwsem(&com->lc_sem);
5050         atomic_set(&com->lc_ref, 1);
5051         com->lc_lfsck = lfsck;
5052         com->lc_type = LFSCK_TYPE_LAYOUT;
5053         if (lfsck->li_master) {
5054                 com->lc_ops = &lfsck_layout_master_ops;
5055                 com->lc_data = lfsck_assistant_data_init(
5056                                 &lfsck_layout_assistant_ops,
5057                                 "lfsck_layout");
5058                 if (com->lc_data == NULL)
5059                         GOTO(out, rc = -ENOMEM);
5060         } else {
5061                 struct lfsck_layout_slave_data *llsd;
5062
5063                 com->lc_ops = &lfsck_layout_slave_ops;
5064                 OBD_ALLOC_PTR(llsd);
5065                 if (llsd == NULL)
5066                         GOTO(out, rc = -ENOMEM);
5067
5068                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5069                 INIT_LIST_HEAD(&llsd->llsd_master_list);
5070                 spin_lock_init(&llsd->llsd_lock);
5071                 llsd->llsd_rb_root = RB_ROOT;
5072                 rwlock_init(&llsd->llsd_rb_lock);
5073                 com->lc_data = llsd;
5074         }
5075         com->lc_file_size = sizeof(*lo);
5076         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5077         if (com->lc_file_ram == NULL)
5078                 GOTO(out, rc = -ENOMEM);
5079
5080         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5081         if (com->lc_file_disk == NULL)
5082                 GOTO(out, rc = -ENOMEM);
5083
5084         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5085         if (IS_ERR(root))
5086                 GOTO(out, rc = PTR_ERR(root));
5087
5088         if (unlikely(!dt_try_as_dir(env, root)))
5089                 GOTO(out, rc = -ENOTDIR);
5090
5091         obj = local_file_find_or_create(env, lfsck->li_los, root,
5092                                         lfsck_layout_name,
5093                                         S_IFREG | S_IRUGO | S_IWUSR);
5094         if (IS_ERR(obj))
5095                 GOTO(out, rc = PTR_ERR(obj));
5096
5097         com->lc_obj = obj;
5098         rc = lfsck_layout_load(env, com);
5099         if (rc > 0)
5100                 rc = lfsck_layout_reset(env, com, true);
5101         else if (rc == -ENOENT)
5102                 rc = lfsck_layout_init(env, com);
5103
5104         if (rc != 0)
5105                 GOTO(out, rc);
5106
5107         lo = com->lc_file_ram;
5108         switch (lo->ll_status) {
5109         case LS_INIT:
5110         case LS_COMPLETED:
5111         case LS_FAILED:
5112         case LS_STOPPED:
5113         case LS_PARTIAL:
5114                 spin_lock(&lfsck->li_lock);
5115                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5116                 spin_unlock(&lfsck->li_lock);
5117                 break;
5118         default:
5119                 CERROR("%s: unknown lfsck_layout status %d\n",
5120                        lfsck_lfsck2name(lfsck), lo->ll_status);
5121                 /* fall through */
5122         case LS_SCANNING_PHASE1:
5123         case LS_SCANNING_PHASE2:
5124                 /* No need to store the status to disk right now.
5125                  * If the system crashed before the status stored,
5126                  * it will be loaded back when next time. */
5127                 lo->ll_status = LS_CRASHED;
5128                 lo->ll_flags |= LF_INCOMPLETE;
5129                 /* fall through */
5130         case LS_PAUSED:
5131         case LS_CRASHED:
5132         case LS_CO_FAILED:
5133         case LS_CO_STOPPED:
5134         case LS_CO_PAUSED:
5135                 spin_lock(&lfsck->li_lock);
5136                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5137                 spin_unlock(&lfsck->li_lock);
5138                 break;
5139         }
5140
5141         if (lo->ll_flags & LF_CRASHED_LASTID) {
5142                 LASSERT(lfsck->li_out_notify != NULL);
5143
5144                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5145                                      LE_LASTID_REBUILDING);
5146         }
5147
5148         GOTO(out, rc = 0);
5149
5150 out:
5151         if (root != NULL && !IS_ERR(root))
5152                 lu_object_put(env, &root->do_lu);
5153
5154         if (rc != 0) {
5155                 lfsck_component_cleanup(env, com);
5156                 CERROR("%s: fail to init layout LFSCK component: rc = %d\n",
5157                        lfsck_lfsck2name(lfsck), rc);
5158         }
5159
5160         return rc;
5161 }
5162
5163 struct lfsck_orphan_it {
5164         struct lfsck_component           *loi_com;
5165         struct lfsck_rbtree_node         *loi_lrn;
5166         struct lfsck_layout_slave_target *loi_llst;
5167         struct lu_fid                     loi_key;
5168         struct lu_orphan_rec              loi_rec;
5169         __u64                             loi_hash;
5170         unsigned int                      loi_over:1;
5171 };
5172
5173 static int lfsck_fid_match_idx(const struct lu_env *env,
5174                                struct lfsck_instance *lfsck,
5175                                const struct lu_fid *fid, int idx)
5176 {
5177         struct seq_server_site  *ss;
5178         struct lu_server_fld    *sf;
5179         struct lu_seq_range      range  = { 0 };
5180         int                      rc;
5181
5182         /* All abnormal cases will be returned to MDT0. */
5183         if (!fid_is_norm(fid)) {
5184                 if (idx == 0)
5185                         return 1;
5186
5187                 return 0;
5188         }
5189
5190         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
5191         if (unlikely(ss == NULL))
5192                 return -ENOTCONN;
5193
5194         sf = ss->ss_server_fld;
5195         LASSERT(sf != NULL);
5196
5197         fld_range_set_any(&range);
5198         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
5199         if (rc != 0)
5200                 return rc;
5201
5202         if (!fld_range_is_mdt(&range))
5203                 return -EINVAL;
5204
5205         if (range.lsr_index == idx)
5206                 return 1;
5207
5208         return 0;
5209 }
5210
5211 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
5212                                         struct dt_device *dev,
5213                                         struct dt_object *obj)
5214 {
5215         struct thandle *handle;
5216         int             rc;
5217         ENTRY;
5218
5219         handle = dt_trans_create(env, dev);
5220         if (IS_ERR(handle))
5221                 RETURN_EXIT;
5222
5223         rc = dt_declare_ref_del(env, obj, handle);
5224         if (rc != 0)
5225                 GOTO(stop, rc);
5226
5227         rc = dt_declare_destroy(env, obj, handle);
5228         if (rc != 0)
5229                 GOTO(stop, rc);
5230
5231         rc = dt_trans_start_local(env, dev, handle);
5232         if (rc != 0)
5233                 GOTO(stop, rc);
5234
5235         dt_write_lock(env, obj, 0);
5236         rc = dt_ref_del(env, obj, handle);
5237         if (rc == 0)
5238                 rc = dt_destroy(env, obj, handle);
5239         dt_write_unlock(env, obj);
5240
5241         GOTO(stop, rc);
5242
5243 stop:
5244         dt_trans_stop(env, dev, handle);
5245
5246         CDEBUG(D_LFSCK, "destroy orphan OST-object "DFID": rc = %d\n",
5247                PFID(lfsck_dto2fid(obj)), rc);
5248
5249         RETURN_EXIT;
5250 }
5251
5252 static int lfsck_orphan_index_lookup(const struct lu_env *env,
5253                                      struct dt_object *dt,
5254                                      struct dt_rec *rec,
5255                                      const struct dt_key *key,
5256                                      struct lustre_capa *capa)
5257 {
5258         return -EOPNOTSUPP;
5259 }
5260
5261 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
5262                                              struct dt_object *dt,
5263                                              const struct dt_rec *rec,
5264                                              const struct dt_key *key,
5265                                              struct thandle *handle)
5266 {
5267         return -EOPNOTSUPP;
5268 }
5269
5270 static int lfsck_orphan_index_insert(const struct lu_env *env,
5271                                      struct dt_object *dt,
5272                                      const struct dt_rec *rec,
5273                                      const struct dt_key *key,
5274                                      struct thandle *handle,
5275                                      struct lustre_capa *capa,
5276                                      int ignore_quota)
5277 {
5278         return -EOPNOTSUPP;
5279 }
5280
5281 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
5282                                              struct dt_object *dt,
5283                                              const struct dt_key *key,
5284                                              struct thandle *handle)
5285 {
5286         return -EOPNOTSUPP;
5287 }
5288
5289 static int lfsck_orphan_index_delete(const struct lu_env *env,
5290                                      struct dt_object *dt,
5291                                      const struct dt_key *key,
5292                                      struct thandle *handle,
5293                                      struct lustre_capa *capa)
5294 {
5295         return -EOPNOTSUPP;
5296 }
5297
5298 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
5299                                           struct dt_object *dt,
5300                                           __u32 attr,
5301                                           struct lustre_capa *capa)
5302 {
5303         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
5304         struct lfsck_instance           *lfsck;
5305         struct lfsck_component          *com    = NULL;
5306         struct lfsck_layout_slave_data  *llsd;
5307         struct lfsck_orphan_it          *it     = NULL;
5308         int                              rc     = 0;
5309         ENTRY;
5310
5311         lfsck = lfsck_instance_find(dev, true, false);
5312         if (unlikely(lfsck == NULL))
5313                 RETURN(ERR_PTR(-ENXIO));
5314
5315         com = lfsck_component_find(lfsck, LFSCK_TYPE_LAYOUT);
5316         if (unlikely(com == NULL))
5317                 GOTO(out, rc = -ENOENT);
5318
5319         llsd = com->lc_data;
5320         if (!llsd->llsd_rbtree_valid)
5321                 GOTO(out, rc = -ESRCH);
5322
5323         OBD_ALLOC_PTR(it);
5324         if (it == NULL)
5325                 GOTO(out, rc = -ENOMEM);
5326
5327         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
5328         if (it->loi_llst == NULL)
5329                 GOTO(out, rc = -ENXIO);
5330
5331         if (dev->dd_record_fid_accessed) {
5332                 /* The first iteration against the rbtree, scan the whole rbtree
5333                  * to remove the nodes which do NOT need to be handled. */
5334                 write_lock(&llsd->llsd_rb_lock);
5335                 if (dev->dd_record_fid_accessed) {
5336                         struct rb_node                  *node;
5337                         struct rb_node                  *next;
5338                         struct lfsck_rbtree_node        *lrn;
5339
5340                         /* No need to record the fid accessing anymore. */
5341                         dev->dd_record_fid_accessed = 0;
5342
5343                         node = rb_first(&llsd->llsd_rb_root);
5344                         while (node != NULL) {
5345                                 next = rb_next(node);
5346                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
5347                                                lrn_node);
5348                                 if (atomic_read(&lrn->lrn_known_count) <=
5349                                     atomic_read(&lrn->lrn_accessed_count)) {
5350                                         rb_erase(node, &llsd->llsd_rb_root);
5351                                         lfsck_rbtree_free(lrn);
5352                                 }
5353                                 node = next;
5354                         }
5355                 }
5356                 write_unlock(&llsd->llsd_rb_lock);
5357         }
5358
5359         /* read lock the rbtree when init, and unlock when fini */
5360         read_lock(&llsd->llsd_rb_lock);
5361         it->loi_com = com;
5362         com = NULL;
5363
5364         GOTO(out, rc = 0);
5365
5366 out:
5367         if (com != NULL)
5368                 lfsck_component_put(env, com);
5369
5370         CDEBUG(D_LFSCK, "%s: init the orphan iteration: rc = %d\n",
5371                lfsck_lfsck2name(lfsck), rc);
5372
5373         lfsck_instance_put(env, lfsck);
5374         if (rc != 0) {
5375                 if (it != NULL)
5376                         OBD_FREE_PTR(it);
5377
5378                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
5379         }
5380
5381         return (struct dt_it *)it;
5382 }
5383
5384 static void lfsck_orphan_it_fini(const struct lu_env *env,
5385                                  struct dt_it *di)
5386 {
5387         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
5388         struct lfsck_component           *com   = it->loi_com;
5389         struct lfsck_layout_slave_data   *llsd;
5390         struct lfsck_layout_slave_target *llst;
5391
5392         if (com != NULL) {
5393                 CDEBUG(D_LFSCK, "%s: fini the orphan iteration\n",
5394                        lfsck_lfsck2name(com->lc_lfsck));
5395
5396                 llsd = com->lc_data;
5397                 read_unlock(&llsd->llsd_rb_lock);
5398                 llst = it->loi_llst;
5399                 LASSERT(llst != NULL);
5400
5401                 /* Save the key and hash for iterate next. */
5402                 llst->llst_fid = it->loi_key;
5403                 llst->llst_hash = it->loi_hash;
5404                 lfsck_layout_llst_put(llst);
5405                 lfsck_component_put(env, com);
5406         }
5407         OBD_FREE_PTR(it);
5408 }
5409
5410 /**
5411  * \retval       +1: the iteration finished
5412  * \retval        0: on success, not finished
5413  * \retval      -ve: on error
5414  */
5415 static int lfsck_orphan_it_next(const struct lu_env *env,
5416                                 struct dt_it *di)
5417 {
5418         struct lfsck_thread_info        *info   = lfsck_env_info(env);
5419         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
5420         struct lu_attr                  *la     = &info->lti_la;
5421         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
5422         struct lu_fid                   *key    = &it->loi_key;
5423         struct lu_orphan_rec            *rec    = &it->loi_rec;
5424         struct lfsck_component          *com    = it->loi_com;
5425         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5426         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5427         struct dt_object                *obj;
5428         struct lfsck_rbtree_node        *lrn;
5429         int                              pos;
5430         int                              rc;
5431         __u32                            save;
5432         __u32                            idx    = it->loi_llst->llst_index;
5433         bool                             exact  = false;
5434         ENTRY;
5435
5436         if (it->loi_over)
5437                 RETURN(1);
5438
5439 again0:
5440         lrn = it->loi_lrn;
5441         if (lrn == NULL) {
5442                 lrn = lfsck_rbtree_search(llsd, key, &exact);
5443                 if (lrn == NULL) {
5444                         it->loi_over = 1;
5445                         RETURN(1);
5446                 }
5447
5448                 it->loi_lrn = lrn;
5449                 if (!exact) {
5450                         key->f_seq = lrn->lrn_seq;
5451                         key->f_oid = lrn->lrn_first_oid;
5452                         key->f_ver = 0;
5453                 }
5454         } else {
5455                 key->f_oid++;
5456                 if (unlikely(key->f_oid == 0)) {
5457                         key->f_seq++;
5458                         it->loi_lrn = NULL;
5459                         goto again0;
5460                 }
5461
5462                 if (key->f_oid >=
5463                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
5464                         it->loi_lrn = NULL;
5465                         goto again0;
5466                 }
5467         }
5468
5469         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
5470                      atomic_read(&lrn->lrn_accessed_count))) {
5471                 struct rb_node *next = rb_next(&lrn->lrn_node);
5472
5473                 while (next != NULL) {
5474                         lrn = rb_entry(next, struct lfsck_rbtree_node,
5475                                        lrn_node);
5476                         if (atomic_read(&lrn->lrn_known_count) >
5477                             atomic_read(&lrn->lrn_accessed_count))
5478                                 break;
5479                         next = rb_next(next);
5480                 }
5481
5482                 if (next == NULL) {
5483                         it->loi_over = 1;
5484                         RETURN(1);
5485                 }
5486
5487                 it->loi_lrn = lrn;
5488                 key->f_seq = lrn->lrn_seq;
5489                 key->f_oid = lrn->lrn_first_oid;
5490                 key->f_ver = 0;
5491         }
5492
5493         pos = key->f_oid - lrn->lrn_first_oid;
5494
5495 again1:
5496         pos = find_next_bit(lrn->lrn_known_bitmap,
5497                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
5498         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
5499                 key->f_oid = lrn->lrn_first_oid + pos;
5500                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
5501                         key->f_seq++;
5502                         key->f_oid = 0;
5503                 }
5504                 it->loi_lrn = NULL;
5505                 goto again0;
5506         }
5507
5508         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
5509                 pos++;
5510                 goto again1;
5511         }
5512
5513         key->f_oid = lrn->lrn_first_oid + pos;
5514         obj = lfsck_object_find(env, lfsck, key);
5515         if (IS_ERR(obj)) {
5516                 rc = PTR_ERR(obj);
5517                 if (rc == -ENOENT) {
5518                         pos++;
5519                         goto again1;
5520                 }
5521                 RETURN(rc);
5522         }
5523
5524         dt_read_lock(env, obj, 0);
5525         if (dt_object_exists(obj) == 0 ||
5526             lfsck_is_dead_obj(obj)) {
5527                 dt_read_unlock(env, obj);
5528                 lfsck_object_put(env, obj);
5529                 pos++;
5530                 goto again1;
5531         }
5532
5533         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
5534         if (rc != 0)
5535                 GOTO(out, rc);
5536
5537         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
5538                           XATTR_NAME_FID, BYPASS_CAPA);
5539         if (rc == -ENODATA) {
5540                 /* For the pre-created OST-object, update the bitmap to avoid
5541                  * others LFSCK (second phase) iteration to touch it again. */
5542                 if (la->la_ctime == 0) {
5543                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
5544                                 atomic_inc(&lrn->lrn_accessed_count);
5545
5546                         /* For the race between repairing dangling referenced
5547                          * MDT-object and unlink the file, it may left orphan
5548                          * OST-object there. Destroy it now! */
5549                         if (unlikely(!(la->la_mode & S_ISUID))) {
5550                                 dt_read_unlock(env, obj);
5551                                 lfsck_layout_destroy_orphan(env,
5552                                                             lfsck->li_bottom,
5553                                                             obj);
5554                                 lfsck_object_put(env, obj);
5555                                 pos++;
5556                                 goto again1;
5557                         }
5558                 } else if (idx == 0) {
5559                         /* If the orphan OST-object has no parent information,
5560                          * regard it as referenced by the MDT-object on MDT0. */
5561                         fid_zero(&rec->lor_fid);
5562                         rec->lor_uid = la->la_uid;
5563                         rec->lor_gid = la->la_gid;
5564                         GOTO(out, rc = 0);
5565                 }
5566
5567                 dt_read_unlock(env, obj);
5568                 lfsck_object_put(env, obj);
5569                 pos++;
5570                 goto again1;
5571         }
5572
5573         if (rc < 0)
5574                 GOTO(out, rc);
5575
5576         if (rc != sizeof(struct filter_fid) &&
5577             rc != sizeof(struct filter_fid_old))
5578                 GOTO(out, rc = -EINVAL);
5579
5580         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
5581         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
5582          * MDT-object's FID::f_ver, instead it is the OST-object index in its
5583          * parent MDT-object's layout EA. */
5584         save = rec->lor_fid.f_stripe_idx;
5585         rec->lor_fid.f_ver = 0;
5586         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
5587         /* If the orphan OST-object does not claim the MDT, then next.
5588          *
5589          * If we do not know whether it matches or not, then return it
5590          * to the MDT for further check. */
5591         if (rc == 0) {
5592                 dt_read_unlock(env, obj);
5593                 lfsck_object_put(env, obj);
5594                 pos++;
5595                 goto again1;
5596         }
5597
5598         rec->lor_fid.f_stripe_idx = save;
5599         rec->lor_uid = la->la_uid;
5600         rec->lor_gid = la->la_gid;
5601
5602         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
5603                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
5604                rec->lor_uid, rec->lor_gid);
5605
5606         GOTO(out, rc = 0);
5607
5608 out:
5609         dt_read_unlock(env, obj);
5610         lfsck_object_put(env, obj);
5611         if (rc == 0)
5612                 it->loi_hash++;
5613
5614         return rc;
5615 }
5616
5617 /**
5618  * \retval       +1: locate to the exactly position
5619  * \retval        0: cannot locate to the exactly position,
5620  *                   call next() to move to a valid position.
5621  * \retval      -ve: on error
5622  */
5623 static int lfsck_orphan_it_get(const struct lu_env *env,
5624                                struct dt_it *di,
5625                                const struct dt_key *key)
5626 {
5627         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
5628         int                      rc;
5629
5630         it->loi_key = *(struct lu_fid *)key;
5631         rc = lfsck_orphan_it_next(env, di);
5632         if (rc == 1)
5633                 return 0;
5634
5635         if (rc == 0)
5636                 return 1;
5637
5638         return rc;
5639 }
5640
5641 static void lfsck_orphan_it_put(const struct lu_env *env,
5642                                 struct dt_it *di)
5643 {
5644 }
5645
5646 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
5647                                           const struct dt_it *di)
5648 {
5649         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5650
5651         return (struct dt_key *)&it->loi_key;
5652 }
5653
5654 static int lfsck_orphan_it_key_size(const struct lu_env *env,
5655                                     const struct dt_it *di)
5656 {
5657         return sizeof(struct lu_fid);
5658 }
5659
5660 static int lfsck_orphan_it_rec(const struct lu_env *env,
5661                                const struct dt_it *di,
5662                                struct dt_rec *rec,
5663                                __u32 attr)
5664 {
5665         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5666
5667         *(struct lu_orphan_rec *)rec = it->loi_rec;
5668
5669         return 0;
5670 }
5671
5672 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
5673                                    const struct dt_it *di)
5674 {
5675         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
5676
5677         return it->loi_hash;
5678 }
5679
5680 /**
5681  * \retval       +1: locate to the exactly position
5682  * \retval        0: cannot locate to the exactly position,
5683  *                   call next() to move to a valid position.
5684  * \retval      -ve: on error
5685  */
5686 static int lfsck_orphan_it_load(const struct lu_env *env,
5687                                 const struct dt_it *di,
5688                                 __u64 hash)
5689 {
5690         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
5691         struct lfsck_layout_slave_target *llst = it->loi_llst;
5692         int                               rc;
5693
5694         LASSERT(llst != NULL);
5695
5696         if (hash != llst->llst_hash) {
5697                 CDEBUG(D_LFSCK, "%s: the given hash "LPU64" for orphan "
5698                        "iteration does not match the one when fini "
5699                        LPU64", to be reset.\n",
5700                        lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
5701                        llst->llst_hash);
5702                 fid_zero(&llst->llst_fid);
5703                 llst->llst_hash = 0;
5704         }
5705
5706         it->loi_key = llst->llst_fid;
5707         it->loi_hash = llst->llst_hash;
5708         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
5709         if (rc == 1)
5710                 return 0;
5711
5712         if (rc == 0)
5713                 return 1;
5714
5715         return rc;
5716 }
5717
5718 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
5719                                    const struct dt_it *di,
5720                                    void *key_rec)
5721 {
5722         return 0;
5723 }
5724
5725 const struct dt_index_operations lfsck_orphan_index_ops = {
5726         .dio_lookup             = lfsck_orphan_index_lookup,
5727         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
5728         .dio_insert             = lfsck_orphan_index_insert,
5729         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
5730         .dio_delete             = lfsck_orphan_index_delete,
5731         .dio_it = {
5732                 .init           = lfsck_orphan_it_init,
5733                 .fini           = lfsck_orphan_it_fini,
5734                 .get            = lfsck_orphan_it_get,
5735                 .put            = lfsck_orphan_it_put,
5736                 .next           = lfsck_orphan_it_next,
5737                 .key            = lfsck_orphan_it_key,
5738                 .key_size       = lfsck_orphan_it_key_size,
5739                 .rec            = lfsck_orphan_it_rec,
5740                 .store          = lfsck_orphan_it_store,
5741                 .load           = lfsck_orphan_it_load,
5742                 .key_rec        = lfsck_orphan_it_key_rec,
5743         }
5744 };