Whamcloud - gitweb
10101ab5e4e4e3d96d691bc03bb9a00ee4b4120e
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         /* The position for next record in the rbtree for iteration. */
68         struct lu_fid           llst_fid;
69         /* Dummy hash for iteration against the rbtree. */
70         __u64                   llst_hash;
71         __u64                   llst_gen;
72         atomic_t                llst_ref;
73         __u32                   llst_index;
74 };
75
76 struct lfsck_layout_slave_data {
77         /* list for lfsck_layout_seq */
78         struct list_head         llsd_seq_list;
79
80         /* list for the masters involve layout verification. */
81         struct list_head         llsd_master_list;
82         spinlock_t               llsd_lock;
83         __u64                    llsd_touch_gen;
84         struct dt_object        *llsd_rb_obj;
85         struct rb_root           llsd_rb_root;
86         rwlock_t                 llsd_rb_lock;
87         unsigned int             llsd_rbtree_valid:1;
88 };
89
90 struct lfsck_layout_object {
91         struct dt_object        *llo_obj;
92         struct lu_attr           llo_attr;
93         atomic_t                 llo_ref;
94         __u16                    llo_gen;
95 };
96
97 struct lfsck_layout_req {
98         struct list_head                 llr_list;
99         struct lfsck_layout_object      *llr_parent;
100         struct dt_object                *llr_child;
101         __u32                            llr_ost_idx;
102         __u32                            llr_lov_idx; /* offset in LOV EA */
103 };
104
105 struct lfsck_layout_master_data {
106         spinlock_t              llmd_lock;
107         struct list_head        llmd_req_list;
108
109         /* list for the ost targets involve layout verification. */
110         struct list_head        llmd_ost_list;
111
112         /* list for the ost targets in phase1 scanning. */
113         struct list_head        llmd_ost_phase1_list;
114
115         /* list for the ost targets in phase1 scanning. */
116         struct list_head        llmd_ost_phase2_list;
117
118         /* list for the mdt targets involve layout verification. */
119         struct list_head        llmd_mdt_list;
120
121         /* list for the mdt targets in phase1 scanning. */
122         struct list_head        llmd_mdt_phase1_list;
123
124         /* list for the mdt targets in phase1 scanning. */
125         struct list_head        llmd_mdt_phase2_list;
126
127         struct ptlrpc_thread    llmd_thread;
128         __u32                   llmd_touch_gen;
129         int                     llmd_prefetched;
130         int                     llmd_assistant_status;
131         int                     llmd_post_result;
132         unsigned int            llmd_to_post:1,
133                                 llmd_to_double_scan:1,
134                                 llmd_in_double_scan:1,
135                                 llmd_exit:1;
136 };
137
138 struct lfsck_layout_slave_async_args {
139         struct obd_export                *llsaa_exp;
140         struct lfsck_component           *llsaa_com;
141         struct lfsck_layout_slave_target *llsaa_llst;
142 };
143
144 static struct lfsck_layout_object *
145 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
146                          __u16 gen)
147 {
148         struct lfsck_layout_object *llo;
149         int                         rc;
150
151         OBD_ALLOC_PTR(llo);
152         if (llo == NULL)
153                 return ERR_PTR(-ENOMEM);
154
155         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
156         if (rc != 0) {
157                 OBD_FREE_PTR(llo);
158
159                 return ERR_PTR(rc);
160         }
161
162         lu_object_get(&obj->do_lu);
163         llo->llo_obj = obj;
164         /* The gen can be used to check whether some others have changed the
165          * file layout after LFSCK pre-fetching but before real verification. */
166         llo->llo_gen = gen;
167         atomic_set(&llo->llo_ref, 1);
168
169         return llo;
170 }
171
172 static inline void
173 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
174 {
175         if (atomic_dec_and_test(&llst->llst_ref)) {
176                 LASSERT(list_empty(&llst->llst_list));
177
178                 OBD_FREE_PTR(llst);
179         }
180 }
181
182 static inline int
183 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
184 {
185         struct lfsck_layout_slave_target *llst;
186         struct lfsck_layout_slave_target *tmp;
187         int                               rc   = 0;
188
189         OBD_ALLOC_PTR(llst);
190         if (llst == NULL)
191                 return -ENOMEM;
192
193         INIT_LIST_HEAD(&llst->llst_list);
194         llst->llst_gen = 0;
195         llst->llst_index = index;
196         atomic_set(&llst->llst_ref, 1);
197
198         spin_lock(&llsd->llsd_lock);
199         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
200                 if (tmp->llst_index == index) {
201                         rc = -EALREADY;
202                         break;
203                 }
204         }
205         if (rc == 0)
206                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
207         spin_unlock(&llsd->llsd_lock);
208
209         if (rc != 0)
210                 OBD_FREE_PTR(llst);
211
212         return rc;
213 }
214
215 static inline void
216 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
217                       struct lfsck_layout_slave_target *llst)
218 {
219         bool del = false;
220
221         spin_lock(&llsd->llsd_lock);
222         if (!list_empty(&llst->llst_list)) {
223                 list_del_init(&llst->llst_list);
224                 del = true;
225         }
226         spin_unlock(&llsd->llsd_lock);
227
228         if (del)
229                 lfsck_layout_llst_put(llst);
230 }
231
232 static inline struct lfsck_layout_slave_target *
233 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
234                                __u32 index, bool unlink)
235 {
236         struct lfsck_layout_slave_target *llst;
237
238         spin_lock(&llsd->llsd_lock);
239         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
240                 if (llst->llst_index == index) {
241                         if (unlink)
242                                 list_del_init(&llst->llst_list);
243                         else
244                                 atomic_inc(&llst->llst_ref);
245                         spin_unlock(&llsd->llsd_lock);
246
247                         return llst;
248                 }
249         }
250         spin_unlock(&llsd->llsd_lock);
251
252         return NULL;
253 }
254
255 static inline void lfsck_layout_object_put(const struct lu_env *env,
256                                            struct lfsck_layout_object *llo)
257 {
258         if (atomic_dec_and_test(&llo->llo_ref)) {
259                 lfsck_object_put(env, llo->llo_obj);
260                 OBD_FREE_PTR(llo);
261         }
262 }
263
264 static struct lfsck_layout_req *
265 lfsck_layout_req_init(struct lfsck_layout_object *parent,
266                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
267 {
268         struct lfsck_layout_req *llr;
269
270         OBD_ALLOC_PTR(llr);
271         if (llr == NULL)
272                 return ERR_PTR(-ENOMEM);
273
274         INIT_LIST_HEAD(&llr->llr_list);
275         atomic_inc(&parent->llo_ref);
276         llr->llr_parent = parent;
277         llr->llr_child = child;
278         llr->llr_ost_idx = ost_idx;
279         llr->llr_lov_idx = lov_idx;
280
281         return llr;
282 }
283
284 static inline void lfsck_layout_req_fini(const struct lu_env *env,
285                                          struct lfsck_layout_req *llr)
286 {
287         lu_object_put(env, &llr->llr_child->do_lu);
288         lfsck_layout_object_put(env, llr->llr_parent);
289         OBD_FREE_PTR(llr);
290 }
291
292 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
293 {
294         bool empty = false;
295
296         spin_lock(&llmd->llmd_lock);
297         if (list_empty(&llmd->llmd_req_list))
298                 empty = true;
299         spin_unlock(&llmd->llmd_lock);
300
301         return empty;
302 }
303
304 static int lfsck_layout_get_lovea(const struct lu_env *env,
305                                   struct dt_object *obj, struct lu_buf *buf)
306 {
307         int rc;
308
309 again:
310         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
311         if (rc == -ERANGE) {
312                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
313                                   BYPASS_CAPA);
314                 if (rc <= 0)
315                         return rc;
316
317                 lu_buf_realloc(buf, rc);
318                 if (buf->lb_buf == NULL)
319                         return -ENOMEM;
320
321                 goto again;
322         }
323
324         if (rc == -ENODATA)
325                 rc = 0;
326
327         if (rc <= 0)
328                 return rc;
329
330         if (unlikely(buf->lb_buf == NULL)) {
331                 lu_buf_alloc(buf, rc);
332                 if (buf->lb_buf == NULL)
333                         return -ENOMEM;
334
335                 goto again;
336         }
337
338         return rc;
339 }
340
341 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
342 {
343         __u32 magic;
344         __u32 pattern;
345
346         magic = le32_to_cpu(lmm->lmm_magic);
347         /* If magic crashed, keep it there. Sometime later, during OST-object
348          * orphan handling, if some OST-object(s) back-point to it, it can be
349          * verified and repaired. */
350         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
351                 struct ost_id   oi;
352                 int             rc;
353
354                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
355                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
356                         rc = -EOPNOTSUPP;
357                 else
358                         rc = -EINVAL;
359
360                 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
361                        rc == -EINVAL ? "Unknown" : "Unsupported",
362                        magic, POSTID(&oi));
363
364                 return rc;
365         }
366
367         pattern = le32_to_cpu(lmm->lmm_pattern);
368         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
369         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
370                 struct ost_id oi;
371
372                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
373                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
374                        pattern, POSTID(&oi));
375
376                 return -EOPNOTSUPP;
377         }
378
379         return 0;
380 }
381
382 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
383 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
384 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
385
386 struct lfsck_rbtree_node {
387         struct rb_node   lrn_node;
388         __u64            lrn_seq;
389         __u32            lrn_first_oid;
390         atomic_t         lrn_known_count;
391         atomic_t         lrn_accessed_count;
392         void            *lrn_known_bitmap;
393         void            *lrn_accessed_bitmap;
394 };
395
396 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
397                                    __u64 seq, __u32 oid)
398 {
399         if (seq < lrn->lrn_seq)
400                 return -1;
401
402         if (seq > lrn->lrn_seq)
403                 return 1;
404
405         if (oid < lrn->lrn_first_oid)
406                 return -1;
407
408         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
409                 return 1;
410
411         return 0;
412 }
413
414 /* The caller should hold llsd->llsd_rb_lock. */
415 static struct lfsck_rbtree_node *
416 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
417                     const struct lu_fid *fid, bool *exact)
418 {
419         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
420         struct rb_node           *prev  = NULL;
421         struct lfsck_rbtree_node *lrn   = NULL;
422         int                       rc    = 0;
423
424         if (exact != NULL)
425                 *exact = true;
426
427         while (node != NULL) {
428                 prev = node;
429                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
430                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
431                 if (rc < 0)
432                         node = node->rb_left;
433                 else if (rc > 0)
434                         node = node->rb_right;
435                 else
436                         return lrn;
437         }
438
439         if (exact == NULL)
440                 return NULL;
441
442         /* If there is no exactly matched one, then to the next valid one. */
443         *exact = false;
444
445         /* The rbtree is empty. */
446         if (rc == 0)
447                 return NULL;
448
449         if (rc < 0)
450                 return lrn;
451
452         node = rb_next(prev);
453
454         /* The end of the rbtree. */
455         if (node == NULL)
456                 return NULL;
457
458         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
459
460         return lrn;
461 }
462
463 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
464                                                   const struct lu_fid *fid)
465 {
466         struct lfsck_rbtree_node *lrn;
467
468         OBD_ALLOC_PTR(lrn);
469         if (lrn == NULL)
470                 return ERR_PTR(-ENOMEM);
471
472         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
473         if (lrn->lrn_known_bitmap == NULL) {
474                 OBD_FREE_PTR(lrn);
475
476                 return ERR_PTR(-ENOMEM);
477         }
478
479         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
480         if (lrn->lrn_accessed_bitmap == NULL) {
481                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
482                 OBD_FREE_PTR(lrn);
483
484                 return ERR_PTR(-ENOMEM);
485         }
486
487         RB_CLEAR_NODE(&lrn->lrn_node);
488         lrn->lrn_seq = fid_seq(fid);
489         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
490         atomic_set(&lrn->lrn_known_count, 0);
491         atomic_set(&lrn->lrn_accessed_count, 0);
492
493         return lrn;
494 }
495
496 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
497 {
498         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
499         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
500         OBD_FREE_PTR(lrn);
501 }
502
503 /* The caller should hold lock. */
504 static struct lfsck_rbtree_node *
505 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
506                     struct lfsck_rbtree_node *lrn)
507 {
508         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
509         struct rb_node            *parent = NULL;
510         struct lfsck_rbtree_node  *tmp;
511         int                        rc;
512
513         while (*pos != NULL) {
514                 parent = *pos;
515                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
516                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
517                 if (rc < 0)
518                         pos = &(*pos)->rb_left;
519                 else if (rc > 0)
520                         pos = &(*pos)->rb_right;
521                 else
522                         return tmp;
523         }
524
525         rb_link_node(&lrn->lrn_node, parent, pos);
526         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
527
528         return lrn;
529 }
530
531 extern const struct dt_index_operations lfsck_orphan_index_ops;
532
533 static int lfsck_rbtree_setup(const struct lu_env *env,
534                               struct lfsck_component *com)
535 {
536         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
537         struct lfsck_instance           *lfsck  = com->lc_lfsck;
538         struct dt_device                *dev    = lfsck->li_bottom;
539         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
540         struct dt_object                *obj;
541
542         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
543         fid->f_oid = lfsck_dev_idx(dev);
544         fid->f_ver = 0;
545         obj = dt_locate(env, dev, fid);
546         if (IS_ERR(obj))
547                 RETURN(PTR_ERR(obj));
548
549         /* Generate an in-RAM object to stand for the layout rbtree.
550          * Scanning the layout rbtree will be via the iteration over
551          * the object. In the future, the rbtree may be written onto
552          * disk with the object.
553          *
554          * Mark the object to be as exist. */
555         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
556         obj->do_index_ops = &lfsck_orphan_index_ops;
557         llsd->llsd_rb_obj = obj;
558         llsd->llsd_rbtree_valid = 1;
559         dev->dd_record_fid_accessed = 1;
560
561         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
562                lfsck_lfsck2name(lfsck));
563
564         return 0;
565 }
566
567 static void lfsck_rbtree_cleanup(const struct lu_env *env,
568                                  struct lfsck_component *com)
569 {
570         struct lfsck_instance           *lfsck = com->lc_lfsck;
571         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
572         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
573         struct rb_node                  *next;
574         struct lfsck_rbtree_node        *lrn;
575
576         lfsck->li_bottom->dd_record_fid_accessed = 0;
577         /* Invalid the rbtree, then no others will use it. */
578         write_lock(&llsd->llsd_rb_lock);
579         llsd->llsd_rbtree_valid = 0;
580         write_unlock(&llsd->llsd_rb_lock);
581
582         while (node != NULL) {
583                 next = rb_next(node);
584                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
585                 rb_erase(node, &llsd->llsd_rb_root);
586                 lfsck_rbtree_free(lrn);
587                 node = next;
588         }
589
590         if (llsd->llsd_rb_obj != NULL) {
591                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
592                 llsd->llsd_rb_obj = NULL;
593         }
594
595         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
596                lfsck_lfsck2name(lfsck));
597 }
598
599 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
600                                        struct lfsck_component *com,
601                                        const struct lu_fid *fid,
602                                        bool accessed)
603 {
604         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
605         struct lfsck_rbtree_node        *lrn;
606         bool                             insert = false;
607         int                              idx;
608         int                              rc     = 0;
609         ENTRY;
610
611         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
612                 RETURN_EXIT;
613
614         if (!fid_is_idif(fid) && !fid_is_norm(fid))
615                 RETURN_EXIT;
616
617         read_lock(&llsd->llsd_rb_lock);
618         if (!llsd->llsd_rbtree_valid)
619                 GOTO(unlock, rc = 0);
620
621         lrn = lfsck_rbtree_search(llsd, fid, NULL);
622         if (lrn == NULL) {
623                 struct lfsck_rbtree_node *tmp;
624
625                 LASSERT(!insert);
626
627                 read_unlock(&llsd->llsd_rb_lock);
628                 tmp = lfsck_rbtree_new(env, fid);
629                 if (IS_ERR(tmp))
630                         GOTO(out, rc = PTR_ERR(tmp));
631
632                 insert = true;
633                 write_lock(&llsd->llsd_rb_lock);
634                 if (!llsd->llsd_rbtree_valid) {
635                         lfsck_rbtree_free(tmp);
636                         GOTO(unlock, rc = 0);
637                 }
638
639                 lrn = lfsck_rbtree_insert(llsd, tmp);
640                 if (lrn != tmp)
641                         lfsck_rbtree_free(tmp);
642         }
643
644         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
645         /* Any accessed object must be a known object. */
646         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
647                 atomic_inc(&lrn->lrn_known_count);
648         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
649                 atomic_inc(&lrn->lrn_accessed_count);
650
651         GOTO(unlock, rc = 0);
652
653 unlock:
654         if (insert)
655                 write_unlock(&llsd->llsd_rb_lock);
656         else
657                 read_unlock(&llsd->llsd_rb_lock);
658 out:
659         if (rc != 0 && accessed) {
660                 struct lfsck_layout *lo = com->lc_file_ram;
661
662                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
663                        "bitmap, and will cause incorrect LFSCK OST-object "
664                        "handling, so disable it to cancel orphan handling "
665                        "for related device. rc = %d\n",
666                        lfsck_lfsck2name(com->lc_lfsck), rc);
667
668                 lo->ll_flags |= LF_INCOMPLETE;
669                 lfsck_rbtree_cleanup(env, com);
670         }
671 }
672
673 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
674                                    const struct lfsck_layout *src)
675 {
676         int i;
677
678         des->ll_magic = le32_to_cpu(src->ll_magic);
679         des->ll_status = le32_to_cpu(src->ll_status);
680         des->ll_flags = le32_to_cpu(src->ll_flags);
681         des->ll_success_count = le32_to_cpu(src->ll_success_count);
682         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
683         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
684         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
685         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
686         des->ll_time_last_checkpoint =
687                                 le64_to_cpu(src->ll_time_last_checkpoint);
688         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
689         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
690         des->ll_pos_first_inconsistent =
691                         le64_to_cpu(src->ll_pos_first_inconsistent);
692         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
693         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
694         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
695         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
696         for (i = 0; i < LLIT_MAX; i++)
697                 des->ll_objs_repaired[i] =
698                                 le64_to_cpu(src->ll_objs_repaired[i]);
699         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
700 }
701
702 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
703                                    const struct lfsck_layout *src)
704 {
705         int i;
706
707         des->ll_magic = cpu_to_le32(src->ll_magic);
708         des->ll_status = cpu_to_le32(src->ll_status);
709         des->ll_flags = cpu_to_le32(src->ll_flags);
710         des->ll_success_count = cpu_to_le32(src->ll_success_count);
711         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
712         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
713         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
714         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
715         des->ll_time_last_checkpoint =
716                                 cpu_to_le64(src->ll_time_last_checkpoint);
717         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
718         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
719         des->ll_pos_first_inconsistent =
720                         cpu_to_le64(src->ll_pos_first_inconsistent);
721         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
722         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
723         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
724         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
725         for (i = 0; i < LLIT_MAX; i++)
726                 des->ll_objs_repaired[i] =
727                                 cpu_to_le64(src->ll_objs_repaired[i]);
728         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
729 }
730
731 /**
732  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
733  * \retval 0: succeed.
734  * \retval -ve: failed cases.
735  */
736 static int lfsck_layout_load(const struct lu_env *env,
737                              struct lfsck_component *com)
738 {
739         struct lfsck_layout             *lo     = com->lc_file_ram;
740         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
741         ssize_t                          size   = com->lc_file_size;
742         loff_t                           pos    = 0;
743         int                              rc;
744
745         rc = dbo->dbo_read(env, com->lc_obj,
746                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
747                            BYPASS_CAPA);
748         if (rc == 0) {
749                 return -ENOENT;
750         } else if (rc < 0) {
751                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
752                        lfsck_lfsck2name(com->lc_lfsck), rc);
753                 return rc;
754         } else if (rc != size) {
755                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
756                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
757                 return 1;
758         }
759
760         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
761         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
762                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
763                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
764                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
765                 return 1;
766         }
767
768         return 0;
769 }
770
771 static int lfsck_layout_store(const struct lu_env *env,
772                               struct lfsck_component *com)
773 {
774         struct dt_object         *obj           = com->lc_obj;
775         struct lfsck_instance    *lfsck         = com->lc_lfsck;
776         struct lfsck_layout      *lo            = com->lc_file_disk;
777         struct thandle           *handle;
778         ssize_t                   size          = com->lc_file_size;
779         loff_t                    pos           = 0;
780         int                       rc;
781         ENTRY;
782
783         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
784         handle = dt_trans_create(env, lfsck->li_bottom);
785         if (IS_ERR(handle))
786                 GOTO(log, rc = PTR_ERR(handle));
787
788         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
789                                      pos, handle);
790         if (rc != 0)
791                 GOTO(out, rc);
792
793         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
794         if (rc != 0)
795                 GOTO(out, rc);
796
797         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
798                              handle);
799
800         GOTO(out, rc);
801
802 out:
803         dt_trans_stop(env, lfsck->li_bottom, handle);
804
805 log:
806         if (rc != 0)
807                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
808                        lfsck_lfsck2name(lfsck), rc);
809         return rc;
810 }
811
812 static int lfsck_layout_init(const struct lu_env *env,
813                              struct lfsck_component *com)
814 {
815         struct lfsck_layout *lo = com->lc_file_ram;
816         int rc;
817
818         memset(lo, 0, com->lc_file_size);
819         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
820         lo->ll_status = LS_INIT;
821         down_write(&com->lc_sem);
822         rc = lfsck_layout_store(env, com);
823         up_write(&com->lc_sem);
824
825         return rc;
826 }
827
828 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
829                              struct dt_object *obj, const struct lu_fid *fid)
830 {
831         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
832         struct lu_seq_range      range  = { 0 };
833         struct lustre_mdt_attrs *lma;
834         int                      rc;
835
836         fld_range_set_any(&range);
837         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
838         if (rc == 0) {
839                 if (fld_range_is_ost(&range))
840                         return 1;
841
842                 return 0;
843         }
844
845         lma = &lfsck_env_info(env)->lti_lma;
846         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
847                           XATTR_NAME_LMA, BYPASS_CAPA);
848         if (rc == sizeof(*lma)) {
849                 lustre_lma_swab(lma);
850
851                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
852         }
853
854         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
855
856         return rc > 0;
857 }
858
859 static struct lfsck_layout_seq *
860 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
861 {
862         struct lfsck_layout_seq *lls;
863
864         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
865                 if (lls->lls_seq == seq)
866                         return lls;
867
868                 if (lls->lls_seq > seq)
869                         return NULL;
870         }
871
872         return NULL;
873 }
874
875 static void
876 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
877                         struct lfsck_layout_seq *lls)
878 {
879         struct lfsck_layout_seq *tmp;
880         struct list_head        *pos = &llsd->llsd_seq_list;
881
882         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
883                 if (lls->lls_seq < tmp->lls_seq) {
884                         pos = &tmp->lls_list;
885                         break;
886                 }
887         }
888         list_add_tail(&lls->lls_list, pos);
889 }
890
891 static int
892 lfsck_layout_lastid_create(const struct lu_env *env,
893                            struct lfsck_instance *lfsck,
894                            struct dt_object *obj)
895 {
896         struct lfsck_thread_info *info   = lfsck_env_info(env);
897         struct lu_attr           *la     = &info->lti_la;
898         struct dt_object_format  *dof    = &info->lti_dof;
899         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
900         struct dt_device         *dt     = lfsck->li_bottom;
901         struct thandle           *th;
902         __u64                     lastid = 0;
903         loff_t                    pos    = 0;
904         int                       rc;
905         ENTRY;
906
907         if (bk->lb_param & LPF_DRYRUN)
908                 return 0;
909
910         memset(la, 0, sizeof(*la));
911         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
912         la->la_valid = LA_MODE | LA_UID | LA_GID;
913         dof->dof_type = dt_mode_to_dft(S_IFREG);
914
915         th = dt_trans_create(env, dt);
916         if (IS_ERR(th))
917                 GOTO(log, rc = PTR_ERR(th));
918
919         rc = dt_declare_create(env, obj, la, NULL, dof, th);
920         if (rc != 0)
921                 GOTO(stop, rc);
922
923         rc = dt_declare_record_write(env, obj,
924                                      lfsck_buf_get(env, &lastid,
925                                                    sizeof(lastid)),
926                                      pos, th);
927         if (rc != 0)
928                 GOTO(stop, rc);
929
930         rc = dt_trans_start_local(env, dt, th);
931         if (rc != 0)
932                 GOTO(stop, rc);
933
934         dt_write_lock(env, obj, 0);
935         if (likely(dt_object_exists(obj) == 0)) {
936                 rc = dt_create(env, obj, la, NULL, dof, th);
937                 if (rc == 0)
938                         rc = dt_record_write(env, obj,
939                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
940                                 &pos, th);
941         }
942         dt_write_unlock(env, obj);
943
944         GOTO(stop, rc);
945
946 stop:
947         dt_trans_stop(env, dt, th);
948
949 log:
950         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
951                LPX64": rc = %d\n",
952                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
953
954         return rc;
955 }
956
957 static int
958 lfsck_layout_lastid_reload(const struct lu_env *env,
959                            struct lfsck_component *com,
960                            struct lfsck_layout_seq *lls)
961 {
962         __u64   lastid;
963         loff_t  pos     = 0;
964         int     rc;
965
966         dt_read_lock(env, lls->lls_lastid_obj, 0);
967         rc = dt_record_read(env, lls->lls_lastid_obj,
968                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
969         dt_read_unlock(env, lls->lls_lastid_obj);
970         if (unlikely(rc != 0))
971                 return rc;
972
973         lastid = le64_to_cpu(lastid);
974         if (lastid < lls->lls_lastid_known) {
975                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
976                 struct lfsck_layout     *lo     = com->lc_file_ram;
977
978                 lls->lls_lastid = lls->lls_lastid_known;
979                 lls->lls_dirty = 1;
980                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
981                         LASSERT(lfsck->li_out_notify != NULL);
982
983                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
984                                              LE_LASTID_REBUILDING);
985                         lo->ll_flags |= LF_CRASHED_LASTID;
986
987                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
988                                "LAST_ID file (1) for the sequence "LPX64
989                                ", old value "LPU64", known value "LPU64"\n",
990                                lfsck_lfsck2name(lfsck), lls->lls_seq,
991                                lastid, lls->lls_lastid);
992                 }
993         } else if (lastid >= lls->lls_lastid) {
994                 lls->lls_lastid = lastid;
995                 lls->lls_dirty = 0;
996         }
997
998         return 0;
999 }
1000
1001 static int
1002 lfsck_layout_lastid_store(const struct lu_env *env,
1003                           struct lfsck_component *com)
1004 {
1005         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1006         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1007         struct dt_device                *dt     = lfsck->li_bottom;
1008         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1009         struct lfsck_layout_seq         *lls;
1010         struct thandle                  *th;
1011         __u64                            lastid;
1012         int                              rc     = 0;
1013         int                              rc1    = 0;
1014
1015         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1016                 loff_t pos = 0;
1017
1018                 if (!lls->lls_dirty)
1019                         continue;
1020
1021                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1022                        "<seq> "LPX64" as <oid> "LPU64"\n",
1023                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1024
1025                 if (bk->lb_param & LPF_DRYRUN) {
1026                         lls->lls_dirty = 0;
1027                         continue;
1028                 }
1029
1030                 th = dt_trans_create(env, dt);
1031                 if (IS_ERR(th)) {
1032                         rc1 = PTR_ERR(th);
1033                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1034                                "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1035                                lfsck_lfsck2name(com->lc_lfsck),
1036                                lls->lls_seq, rc1);
1037                         continue;
1038                 }
1039
1040                 lastid = cpu_to_le64(lls->lls_lastid);
1041                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1042                                              lfsck_buf_get(env, &lastid,
1043                                                            sizeof(lastid)),
1044                                              pos, th);
1045                 if (rc != 0)
1046                         goto stop;
1047
1048                 rc = dt_trans_start_local(env, dt, th);
1049                 if (rc != 0)
1050                         goto stop;
1051
1052                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1053                 rc = dt_record_write(env, lls->lls_lastid_obj,
1054                                      lfsck_buf_get(env, &lastid,
1055                                      sizeof(lastid)), &pos, th);
1056                 dt_write_unlock(env, lls->lls_lastid_obj);
1057                 if (rc == 0)
1058                         lls->lls_dirty = 0;
1059
1060 stop:
1061                 dt_trans_stop(env, dt, th);
1062                 if (rc != 0) {
1063                         rc1 = rc;
1064                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1065                                "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1066                                lfsck_lfsck2name(com->lc_lfsck),
1067                                lls->lls_seq, rc1);
1068                 }
1069         }
1070
1071         return rc1;
1072 }
1073
1074 static int
1075 lfsck_layout_lastid_load(const struct lu_env *env,
1076                          struct lfsck_component *com,
1077                          struct lfsck_layout_seq *lls)
1078 {
1079         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1080         struct lfsck_layout     *lo     = com->lc_file_ram;
1081         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1082         struct dt_object        *obj;
1083         loff_t                   pos    = 0;
1084         int                      rc;
1085         ENTRY;
1086
1087         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1088         obj = dt_locate(env, lfsck->li_bottom, fid);
1089         if (IS_ERR(obj))
1090                 RETURN(PTR_ERR(obj));
1091
1092         /* LAST_ID crashed, to be rebuilt */
1093         if (dt_object_exists(obj) == 0) {
1094                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1095                         LASSERT(lfsck->li_out_notify != NULL);
1096
1097                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1098                                              LE_LASTID_REBUILDING);
1099                         lo->ll_flags |= LF_CRASHED_LASTID;
1100
1101                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1102                                "LAST_ID file for sequence "LPX64"\n",
1103                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1104
1105                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1106                             cfs_fail_val > 0) {
1107                                 struct l_wait_info lwi = LWI_TIMEOUT(
1108                                                 cfs_time_seconds(cfs_fail_val),
1109                                                 NULL, NULL);
1110
1111                                 up_write(&com->lc_sem);
1112                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1113                                              !thread_is_running(&lfsck->li_thread),
1114                                              &lwi);
1115                                 down_write(&com->lc_sem);
1116                         }
1117                 }
1118
1119                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1120         } else {
1121                 dt_read_lock(env, obj, 0);
1122                 rc = dt_read(env, obj,
1123                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1124                         &pos);
1125                 dt_read_unlock(env, obj);
1126                 if (rc != 0 && rc != sizeof(__u64))
1127                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1128
1129                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1130                         LASSERT(lfsck->li_out_notify != NULL);
1131
1132                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1133                                              LE_LASTID_REBUILDING);
1134                         lo->ll_flags |= LF_CRASHED_LASTID;
1135
1136                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1137                                "LAST_ID file for the sequence "LPX64
1138                                ": rc = %d\n",
1139                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1140                 }
1141
1142                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1143                 rc = 0;
1144         }
1145
1146         GOTO(out, rc);
1147
1148 out:
1149         if (rc != 0)
1150                 lfsck_object_put(env, obj);
1151         else
1152                 lls->lls_lastid_obj = obj;
1153
1154         return rc;
1155 }
1156
1157 static void lfsck_layout_record_failure(const struct lu_env *env,
1158                                                  struct lfsck_instance *lfsck,
1159                                                  struct lfsck_layout *lo)
1160 {
1161         lo->ll_objs_failed_phase1++;
1162         if (unlikely(lo->ll_pos_first_inconsistent == 0)) {
1163                 lo->ll_pos_first_inconsistent =
1164                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1165                                                         lfsck->li_di_oit);
1166
1167                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1168                        "inconsistency at the pos ["LPU64"]\n",
1169                        lfsck_lfsck2name(lfsck),
1170                        lo->ll_pos_first_inconsistent);
1171         }
1172 }
1173
1174 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1175                                                struct ptlrpc_request *req,
1176                                                void *args, int rc)
1177 {
1178         struct lfsck_async_interpret_args *laia = args;
1179         struct lfsck_component            *com  = laia->laia_com;
1180         struct lfsck_layout_master_data   *llmd = com->lc_data;
1181         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1182         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1183         struct lfsck_request              *lr   = laia->laia_lr;
1184
1185         switch (lr->lr_event) {
1186         case LE_START:
1187                 if (rc != 0) {
1188                         struct lfsck_layout *lo = com->lc_file_ram;
1189
1190                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout "
1191                                "start: rc = %d\n",
1192                                lfsck_lfsck2name(com->lc_lfsck),
1193                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1194                                ltd->ltd_index, rc);
1195                         lo->ll_flags |= LF_INCOMPLETE;
1196                         break;
1197                 }
1198
1199                 spin_lock(&ltds->ltd_lock);
1200                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1201                         spin_unlock(&ltds->ltd_lock);
1202                         break;
1203                 }
1204
1205                 if (lr->lr_flags & LEF_TO_OST) {
1206                         if (list_empty(&ltd->ltd_layout_list))
1207                                 list_add_tail(&ltd->ltd_layout_list,
1208                                               &llmd->llmd_ost_list);
1209                         if (list_empty(&ltd->ltd_layout_phase_list))
1210                                 list_add_tail(&ltd->ltd_layout_phase_list,
1211                                               &llmd->llmd_ost_phase1_list);
1212                 } else {
1213                         if (list_empty(&ltd->ltd_layout_list))
1214                                 list_add_tail(&ltd->ltd_layout_list,
1215                                               &llmd->llmd_mdt_list);
1216                         if (list_empty(&ltd->ltd_layout_phase_list))
1217                                 list_add_tail(&ltd->ltd_layout_phase_list,
1218                                               &llmd->llmd_mdt_phase1_list);
1219                 }
1220                 spin_unlock(&ltds->ltd_lock);
1221                 break;
1222         case LE_STOP:
1223         case LE_PHASE1_DONE:
1224         case LE_PHASE2_DONE:
1225         case LE_PEER_EXIT:
1226                 if (rc != 0 && rc != -EALREADY)
1227                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout: "
1228                                "event = %d, rc = %d\n",
1229                                lfsck_lfsck2name(com->lc_lfsck),
1230                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1231                                ltd->ltd_index, lr->lr_event, rc);
1232                 break;
1233         case LE_QUERY: {
1234                 struct lfsck_reply *reply;
1235
1236                 if (rc != 0) {
1237                         spin_lock(&ltds->ltd_lock);
1238                         list_del_init(&ltd->ltd_layout_phase_list);
1239                         list_del_init(&ltd->ltd_layout_list);
1240                         spin_unlock(&ltds->ltd_lock);
1241                         break;
1242                 }
1243
1244                 reply = req_capsule_server_get(&req->rq_pill,
1245                                                &RMF_LFSCK_REPLY);
1246                 if (reply == NULL) {
1247                         rc = -EPROTO;
1248                         CDEBUG(D_LFSCK, "%s:  invalid query reply: rc = %d\n",
1249                                lfsck_lfsck2name(com->lc_lfsck), rc);
1250                         spin_lock(&ltds->ltd_lock);
1251                         list_del_init(&ltd->ltd_layout_phase_list);
1252                         list_del_init(&ltd->ltd_layout_list);
1253                         spin_unlock(&ltds->ltd_lock);
1254                         break;
1255                 }
1256
1257                 switch (reply->lr_status) {
1258                 case LS_SCANNING_PHASE1:
1259                         break;
1260                 case LS_SCANNING_PHASE2:
1261                         spin_lock(&ltds->ltd_lock);
1262                         list_del_init(&ltd->ltd_layout_phase_list);
1263                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1264                                 spin_unlock(&ltds->ltd_lock);
1265                                 break;
1266                         }
1267
1268                         if (lr->lr_flags & LEF_TO_OST)
1269                                 list_add_tail(&ltd->ltd_layout_phase_list,
1270                                               &llmd->llmd_ost_phase2_list);
1271                         else
1272                                 list_add_tail(&ltd->ltd_layout_phase_list,
1273                                               &llmd->llmd_mdt_phase2_list);
1274                         spin_unlock(&ltds->ltd_lock);
1275                         break;
1276                 default:
1277                         spin_lock(&ltds->ltd_lock);
1278                         list_del_init(&ltd->ltd_layout_phase_list);
1279                         list_del_init(&ltd->ltd_layout_list);
1280                         spin_unlock(&ltds->ltd_lock);
1281                         break;
1282                 }
1283                 break;
1284         }
1285         default:
1286                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1287                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1288                 break;
1289         }
1290
1291         if (!laia->laia_shared) {
1292                 lfsck_tgt_put(ltd);
1293                 lfsck_component_put(env, com);
1294         }
1295
1296         return 0;
1297 }
1298
1299 static int lfsck_layout_master_query_others(const struct lu_env *env,
1300                                             struct lfsck_component *com)
1301 {
1302         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1303         struct lfsck_request              *lr    = &info->lti_lr;
1304         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1305         struct lfsck_instance             *lfsck = com->lc_lfsck;
1306         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1307         struct ptlrpc_request_set         *set;
1308         struct lfsck_tgt_descs            *ltds;
1309         struct lfsck_tgt_desc             *ltd;
1310         struct list_head                  *head;
1311         int                                rc    = 0;
1312         int                                rc1   = 0;
1313         ENTRY;
1314
1315         set = ptlrpc_prep_set();
1316         if (set == NULL)
1317                 RETURN(-ENOMEM);
1318
1319         llmd->llmd_touch_gen++;
1320         memset(lr, 0, sizeof(*lr));
1321         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1322         lr->lr_event = LE_QUERY;
1323         lr->lr_active = LFSCK_TYPE_LAYOUT;
1324         laia->laia_com = com;
1325         laia->laia_lr = lr;
1326         laia->laia_shared = 0;
1327
1328         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1329                 ltds = &lfsck->li_mdt_descs;
1330                 lr->lr_flags = 0;
1331                 head = &llmd->llmd_mdt_phase1_list;
1332         } else {
1333
1334 again:
1335                 ltds = &lfsck->li_ost_descs;
1336                 lr->lr_flags = LEF_TO_OST;
1337                 head = &llmd->llmd_ost_phase1_list;
1338         }
1339
1340         laia->laia_ltds = ltds;
1341         spin_lock(&ltds->ltd_lock);
1342         while (!list_empty(head)) {
1343                 ltd = list_entry(head->next,
1344                                  struct lfsck_tgt_desc,
1345                                  ltd_layout_phase_list);
1346                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1347                         break;
1348
1349                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1350                 list_move_tail(&ltd->ltd_layout_phase_list, head);
1351                 atomic_inc(&ltd->ltd_ref);
1352                 laia->laia_ltd = ltd;
1353                 spin_unlock(&ltds->ltd_lock);
1354                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1355                                          lfsck_layout_master_async_interpret,
1356                                          laia, LFSCK_QUERY);
1357                 if (rc != 0) {
1358                         CDEBUG(D_LFSCK, "%s: layout LFSCK fail to query %s %x: "
1359                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1360                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1361                                ltd->ltd_index, rc);
1362                         lfsck_tgt_put(ltd);
1363                         rc1 = rc;
1364                 }
1365                 spin_lock(&ltds->ltd_lock);
1366         }
1367         spin_unlock(&ltds->ltd_lock);
1368
1369         rc = ptlrpc_set_wait(set);
1370         if (rc < 0) {
1371                 ptlrpc_set_destroy(set);
1372                 RETURN(rc);
1373         }
1374
1375         if (!(lr->lr_flags & LEF_TO_OST) &&
1376             list_empty(&llmd->llmd_mdt_phase1_list))
1377                 goto again;
1378
1379         ptlrpc_set_destroy(set);
1380
1381         RETURN(rc1 != 0 ? rc1 : rc);
1382 }
1383
1384 static inline bool
1385 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1386 {
1387         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1388                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1389                 list_empty(&llmd->llmd_ost_phase1_list));
1390 }
1391
1392 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1393                                              struct lfsck_component *com,
1394                                              struct lfsck_request *lr)
1395 {
1396         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1397         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1398         struct lfsck_instance             *lfsck = com->lc_lfsck;
1399         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1400         struct lfsck_layout               *lo    = com->lc_file_ram;
1401         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1402         struct ptlrpc_request_set         *set;
1403         struct lfsck_tgt_descs            *ltds;
1404         struct lfsck_tgt_desc             *ltd;
1405         struct lfsck_tgt_desc             *next;
1406         struct list_head                  *head;
1407         __u32                              idx;
1408         int                                rc    = 0;
1409         ENTRY;
1410
1411         set = ptlrpc_prep_set();
1412         if (set == NULL)
1413                 RETURN(-ENOMEM);
1414
1415         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1416         lr->lr_active = LFSCK_TYPE_LAYOUT;
1417         laia->laia_com = com;
1418         laia->laia_lr = lr;
1419         laia->laia_shared = 0;
1420         switch (lr->lr_event) {
1421         case LE_START:
1422                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1423                 ltds = &lfsck->li_ost_descs;
1424                 laia->laia_ltds = ltds;
1425                 down_read(&ltds->ltd_rw_sem);
1426                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1427                         ltd = lfsck_tgt_get(ltds, idx);
1428                         LASSERT(ltd != NULL);
1429
1430                         laia->laia_ltd = ltd;
1431                         ltd->ltd_layout_done = 0;
1432                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1433                                         lfsck_layout_master_async_interpret,
1434                                         laia, LFSCK_NOTIFY);
1435                         if (rc != 0) {
1436                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1437                                        "notify %s %x for start: rc = %d\n",
1438                                        lfsck_lfsck2name(lfsck),
1439                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1440                                        "MDT", idx, rc);
1441                                 lfsck_tgt_put(ltd);
1442                                 lo->ll_flags |= LF_INCOMPLETE;
1443                         }
1444                 }
1445                 up_read(&ltds->ltd_rw_sem);
1446
1447                 /* Sync up */
1448                 rc = ptlrpc_set_wait(set);
1449                 if (rc < 0) {
1450                         ptlrpc_set_destroy(set);
1451                         RETURN(rc);
1452                 }
1453
1454                 if (!(bk->lb_param & LPF_ALL_TGT))
1455                         break;
1456
1457                 /* link other MDT targets locallly. */
1458                 ltds = &lfsck->li_mdt_descs;
1459                 spin_lock(&ltds->ltd_lock);
1460                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1461                         ltd = LTD_TGT(ltds, idx);
1462                         LASSERT(ltd != NULL);
1463
1464                         if (!list_empty(&ltd->ltd_layout_list))
1465                                 continue;
1466
1467                         list_add_tail(&ltd->ltd_layout_list,
1468                                       &llmd->llmd_mdt_list);
1469                         list_add_tail(&ltd->ltd_layout_phase_list,
1470                                       &llmd->llmd_mdt_phase1_list);
1471                 }
1472                 spin_unlock(&ltds->ltd_lock);
1473                 break;
1474         case LE_STOP:
1475         case LE_PHASE2_DONE:
1476         case LE_PEER_EXIT: {
1477                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1478                 if (bk->lb_param & LPF_ALL_TGT) {
1479                         head = &llmd->llmd_mdt_list;
1480                         ltds = &lfsck->li_mdt_descs;
1481                         if (lr->lr_event == LE_STOP) {
1482                                 /* unlink other MDT targets locallly. */
1483                                 spin_lock(&ltds->ltd_lock);
1484                                 list_for_each_entry_safe(ltd, next, head,
1485                                                          ltd_layout_list) {
1486                                         list_del_init(&ltd->ltd_layout_phase_list);
1487                                         list_del_init(&ltd->ltd_layout_list);
1488                                 }
1489                                 spin_unlock(&ltds->ltd_lock);
1490
1491                                 lr->lr_flags |= LEF_TO_OST;
1492                                 head = &llmd->llmd_ost_list;
1493                                 ltds = &lfsck->li_ost_descs;
1494                         } else {
1495                                 lr->lr_flags &= ~LEF_TO_OST;
1496                         }
1497                 } else {
1498                         lr->lr_flags |= LEF_TO_OST;
1499                         head = &llmd->llmd_ost_list;
1500                         ltds = &lfsck->li_ost_descs;
1501                 }
1502
1503 again:
1504                 laia->laia_ltds = ltds;
1505                 spin_lock(&ltds->ltd_lock);
1506                 while (!list_empty(head)) {
1507                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1508                                          ltd_layout_list);
1509                         if (!list_empty(&ltd->ltd_layout_phase_list))
1510                                 list_del_init(&ltd->ltd_layout_phase_list);
1511                         list_del_init(&ltd->ltd_layout_list);
1512                         atomic_inc(&ltd->ltd_ref);
1513                         laia->laia_ltd = ltd;
1514                         spin_unlock(&ltds->ltd_lock);
1515                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1516                                         lfsck_layout_master_async_interpret,
1517                                         laia, LFSCK_NOTIFY);
1518                         if (rc != 0) {
1519                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1520                                        "notify %s %x for stop/phase2_done/"
1521                                        "peer_exit: rc = %d\n",
1522                                        lfsck_lfsck2name(lfsck),
1523                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1524                                        "MDT", ltd->ltd_index, rc);
1525                                 lfsck_tgt_put(ltd);
1526                         }
1527                         spin_lock(&ltds->ltd_lock);
1528                 }
1529                 spin_unlock(&ltds->ltd_lock);
1530
1531                 rc = ptlrpc_set_wait(set);
1532                 if (rc < 0) {
1533                         ptlrpc_set_destroy(set);
1534                         RETURN(rc);
1535                 }
1536
1537                 if (!(lr->lr_flags & LEF_TO_OST)) {
1538                         lr->lr_flags |= LEF_TO_OST;
1539                         head = &llmd->llmd_ost_list;
1540                         ltds = &lfsck->li_ost_descs;
1541                         goto again;
1542                 }
1543                 break;
1544         }
1545         case LE_PHASE1_DONE:
1546                 llmd->llmd_touch_gen++;
1547                 ltds = &lfsck->li_mdt_descs;
1548                 laia->laia_ltds = ltds;
1549                 spin_lock(&ltds->ltd_lock);
1550                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1551                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1552                                          struct lfsck_tgt_desc,
1553                                          ltd_layout_phase_list);
1554                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1555                                 break;
1556
1557                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1558                         list_move_tail(&ltd->ltd_layout_phase_list,
1559                                        &llmd->llmd_mdt_phase1_list);
1560                         atomic_inc(&ltd->ltd_ref);
1561                         laia->laia_ltd = ltd;
1562                         spin_unlock(&ltds->ltd_lock);
1563                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1564                                         lfsck_layout_master_async_interpret,
1565                                         laia, LFSCK_NOTIFY);
1566                         if (rc != 0) {
1567                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1568                                        "notify MDT %x for phase1_done: "
1569                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1570                                        ltd->ltd_index, rc);
1571                                 lfsck_tgt_put(ltd);
1572                         }
1573                         spin_lock(&ltds->ltd_lock);
1574                 }
1575                 spin_unlock(&ltds->ltd_lock);
1576                 break;
1577         default:
1578                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1579                        lfsck_lfsck2name(lfsck), lr->lr_event);
1580                 rc = -EINVAL;
1581                 break;
1582         }
1583
1584         rc = ptlrpc_set_wait(set);
1585         ptlrpc_set_destroy(set);
1586
1587         RETURN(rc);
1588 }
1589
1590 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1591                                            struct lfsck_component *com,
1592                                            int rc)
1593 {
1594         struct lfsck_instance   *lfsck = com->lc_lfsck;
1595         struct lfsck_layout     *lo    = com->lc_file_ram;
1596         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1597
1598         down_write(&com->lc_sem);
1599         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1600                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1601         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1602         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1603
1604         if (rc > 0) {
1605                 com->lc_journal = 0;
1606                 if (lo->ll_flags & LF_INCOMPLETE)
1607                         lo->ll_status = LS_PARTIAL;
1608                 else
1609                         lo->ll_status = LS_COMPLETED;
1610                 if (!(bk->lb_param & LPF_DRYRUN))
1611                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1612                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1613                 lo->ll_success_count++;
1614         } else if (rc == 0) {
1615                 lo->ll_status = lfsck->li_status;
1616                 if (lo->ll_status == 0)
1617                         lo->ll_status = LS_STOPPED;
1618         } else {
1619                 lo->ll_status = LS_FAILED;
1620         }
1621
1622         rc = lfsck_layout_store(env, com);
1623         up_write(&com->lc_sem);
1624
1625         return rc;
1626 }
1627
1628 static int lfsck_layout_lock(const struct lu_env *env,
1629                              struct lfsck_component *com,
1630                              struct dt_object *obj,
1631                              struct lustre_handle *lh, __u64 bits)
1632 {
1633         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1634         ldlm_policy_data_t              *policy = &info->lti_policy;
1635         struct ldlm_res_id              *resid  = &info->lti_resid;
1636         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1637         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1638         int                              rc;
1639
1640         LASSERT(lfsck->li_namespace != NULL);
1641
1642         memset(policy, 0, sizeof(*policy));
1643         policy->l_inodebits.bits = bits;
1644         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1645         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1646                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1647                                     ldlm_completion_ast, NULL, NULL, 0,
1648                                     LVB_T_NONE, NULL, lh);
1649         if (rc == ELDLM_OK) {
1650                 rc = 0;
1651         } else {
1652                 memset(lh, 0, sizeof(*lh));
1653                 rc = -EIO;
1654         }
1655
1656         return rc;
1657 }
1658
1659 static void lfsck_layout_unlock(struct lustre_handle *lh)
1660 {
1661         if (lustre_handle_is_used(lh)) {
1662                 ldlm_lock_decref(lh, LCK_EX);
1663                 memset(lh, 0, sizeof(*lh));
1664         }
1665 }
1666
1667 static int lfsck_layout_trans_stop(const struct lu_env *env,
1668                                    struct dt_device *dev,
1669                                    struct thandle *handle, int result)
1670 {
1671         int rc;
1672
1673         handle->th_result = result;
1674         rc = dt_trans_stop(env, dev, handle);
1675         if (rc > 0)
1676                 rc = 0;
1677         else if (rc == 0)
1678                 rc = 1;
1679
1680         return rc;
1681 }
1682
1683 /**
1684  * Get the system default stripe size.
1685  *
1686  * \param[in] env       pointer to the thread context
1687  * \param[in] lfsck     pointer to the lfsck instance
1688  * \param[out] size     pointer to the default stripe size
1689  *
1690  * \retval              0 for success
1691  * \retval              negative error number on failure
1692  */
1693 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1694                                            struct lfsck_instance *lfsck,
1695                                            __u32 *size)
1696 {
1697         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1698         struct dt_object        *root;
1699         int                      rc;
1700
1701         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1702         if (IS_ERR(root))
1703                 return PTR_ERR(root);
1704
1705         /* Get the default stripe size via xattr_get on the backend root. */
1706         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1707                           XATTR_NAME_LOV, BYPASS_CAPA);
1708         if (rc > 0) {
1709                 /* The lum->lmm_stripe_size is LE mode. The *size also
1710                  * should be LE mode. So it is unnecessary to convert. */
1711                 *size = lum->lmm_stripe_size;
1712                 rc = 0;
1713         } else if (unlikely(rc == 0)) {
1714                 rc = -EINVAL;
1715         }
1716
1717         lfsck_object_put(env, root);
1718
1719         return rc;
1720 }
1721
1722 /**
1723  * \retval       +1: repaired
1724  * \retval        0: did nothing
1725  * \retval      -ve: on error
1726  */
1727 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1728                                      struct thandle *handle,
1729                                      struct dt_object *parent,
1730                                      struct lu_fid *cfid,
1731                                      struct lu_buf *buf,
1732                                      struct lov_ost_data_v1 *slot,
1733                                      int fl, __u32 ost_idx)
1734 {
1735         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1736         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1737         struct lu_buf            ea_buf;
1738         int                      rc;
1739         __u32                    magic;
1740         __u16                    count;
1741
1742         magic = le32_to_cpu(lmm->lmm_magic);
1743         count = le16_to_cpu(lmm->lmm_stripe_count);
1744
1745         fid_to_ostid(cfid, oi);
1746         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1747         slot->l_ost_gen = cpu_to_le32(0);
1748         slot->l_ost_idx = cpu_to_le32(ost_idx);
1749
1750         if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1751                 struct lov_ost_data_v1 *objs;
1752                 int                     i;
1753
1754                 if (magic == LOV_MAGIC_V1)
1755                         objs = &lmm->lmm_objects[0];
1756                 else
1757                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1758                 for (i = 0; i < count; i++, objs++) {
1759                         if (objs != slot && lovea_slot_is_dummy(objs))
1760                                 break;
1761                 }
1762
1763                 /* If the @slot is the last dummy slot to be refilled,
1764                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1765                 if (i == count)
1766                         lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1767         }
1768
1769         lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1770         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1771                           BYPASS_CAPA);
1772         if (rc == 0)
1773                 rc = 1;
1774
1775         return rc;
1776 }
1777
1778 /**
1779  * \retval       +1: repaired
1780  * \retval        0: did nothing
1781  * \retval      -ve: on error
1782  */
1783 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1784                                      struct lfsck_instance *lfsck,
1785                                      struct thandle *handle,
1786                                      struct dt_object *parent,
1787                                      struct lu_fid *cfid,
1788                                      struct lu_buf *buf, int fl,
1789                                      __u32 ost_idx, __u32 ea_off, bool reset)
1790 {
1791         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1792         struct lov_ost_data_v1  *objs;
1793         int                      rc;
1794         __u16                    count;
1795         bool                     hole   = false;
1796         ENTRY;
1797
1798         if (fl == LU_XATTR_CREATE || reset) {
1799                 __u32 pattern = LOV_PATTERN_RAID0;
1800
1801                 count = ea_off + 1;
1802                 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1803
1804                 if (ea_off != 0 || reset) {
1805                         pattern |= LOV_PATTERN_F_HOLE;
1806                         hole = true;
1807                 }
1808
1809                 memset(lmm, 0, buf->lb_len);
1810                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1811                 lmm->lmm_pattern = cpu_to_le32(pattern);
1812                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1813                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1814
1815                 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1816                                                      &lmm->lmm_stripe_size);
1817                 if (rc != 0)
1818                         RETURN(rc);
1819
1820                 objs = &lmm->lmm_objects[ea_off];
1821         } else {
1822                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1823                 int     gap;
1824
1825                 count = le16_to_cpu(lmm->lmm_stripe_count);
1826                 if (magic == LOV_MAGIC_V1)
1827                         objs = &lmm->lmm_objects[count];
1828                 else
1829                         objs = &((struct lov_mds_md_v3 *)lmm)->
1830                                                         lmm_objects[count];
1831
1832                 gap = ea_off - count;
1833                 if (gap >= 0)
1834                         count = ea_off + 1;
1835                 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1836
1837                 if (gap > 0) {
1838                         memset(objs, 0, gap * sizeof(*objs));
1839                         lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1840                         hole = true;
1841                 }
1842
1843                 lmm->lmm_layout_gen =
1844                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1845                 objs += gap;
1846         }
1847
1848         lmm->lmm_stripe_count = cpu_to_le16(count);
1849         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1850                                        fl, ost_idx);
1851
1852         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1853                DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1854                "reset %s, %s LOV EA hole: rc = %d\n",
1855                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1856                ost_idx, ea_off, fl, reset ? "yes" : "no",
1857                hole ? "with" : "without", rc);
1858
1859         RETURN(rc);
1860 }
1861
1862 /**
1863  * \retval       +1: repaired
1864  * \retval        0: did nothing
1865  * \retval      -ve: on error
1866  */
1867 static int lfsck_layout_update_pfid(const struct lu_env *env,
1868                                     struct lfsck_component *com,
1869                                     struct dt_object *parent,
1870                                     struct lu_fid *cfid,
1871                                     struct dt_device *cdev, __u32 ea_off)
1872 {
1873         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1874         struct dt_object        *child;
1875         struct thandle          *handle;
1876         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1877         struct lu_buf           *buf;
1878         int                      rc     = 0;
1879         ENTRY;
1880
1881         child = lfsck_object_find_by_dev(env, cdev, cfid);
1882         if (IS_ERR(child))
1883                 RETURN(PTR_ERR(child));
1884
1885         handle = dt_trans_create(env, cdev);
1886         if (IS_ERR(handle))
1887                 GOTO(out, rc = PTR_ERR(handle));
1888
1889         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1890         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1891         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1892          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1893          * parent MDT-object's layout EA. */
1894         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1895         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1896
1897         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1898         if (rc != 0)
1899                 GOTO(stop, rc);
1900
1901         rc = dt_trans_start(env, cdev, handle);
1902         if (rc != 0)
1903                 GOTO(stop, rc);
1904
1905         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1906                           BYPASS_CAPA);
1907
1908         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1909
1910 stop:
1911         dt_trans_stop(env, cdev, handle);
1912
1913 out:
1914         lu_object_put(env, &child->do_lu);
1915
1916         return rc;
1917 }
1918
1919 /**
1920  * This function will create the MDT-object with the given (partial) LOV EA.
1921  *
1922  * Under some data corruption cases, the MDT-object of the file may be lost,
1923  * but its OST-objects, or some of them are there. The layout LFSCK needs to
1924  * re-create the MDT-object with the orphan OST-object(s) information.
1925  *
1926  * On the other hand, the LFSCK may has created some OST-object for repairing
1927  * dangling LOV EA reference, but as the LFSCK processing, it may find that
1928  * the old OST-object is there and should replace the former new created OST
1929  * object. Unfortunately, some others have modified such newly created object.
1930  * To keep the data (both new and old), the LFSCK will create MDT-object with
1931  * new FID to reference the original OST-object.
1932  *
1933  * \param[in] env       pointer to the thread context
1934  * \param[in] com       pointer to the lfsck component
1935  * \param[in] ltd       pointer to target device descriptor
1936  * \param[in] rec       pointer to the record for the orphan OST-object
1937  * \param[in] cfid      pointer to FID for the orphan OST-object
1938  * \param[in] infix     additional information, such as the FID for original
1939  *                      MDT-object and the stripe offset in the LOV EA
1940  * \param[in] type      the type for describing why the orphan MDT-object is
1941  *                      created. The rules are as following:
1942  *
1943  *  type "C":           Multiple OST-objects claim the same MDT-object and the
1944  *                      same slot in the layout EA. Then the LFSCK will create
1945  *                      new MDT-object(s) to hold the conflict OST-object(s).
1946  *
1947  *  type "N":           The orphan OST-object does not know which one was the
1948  *                      real parent MDT-object, so the LFSCK uses new FID for
1949  *                      its parent MDT-object.
1950  *
1951  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
1952  *                      but does not know the position (the file name) in the
1953  *                      namespace.
1954  *
1955  * The orphan name will be like:
1956  * ${FID}-${infix}-${type}-${conflict_version}
1957  *
1958  * \param[in] ea_off    the stripe offset in the LOV EA
1959  *
1960  * \retval              positive on repaired something
1961  * \retval              0 if needs to repair nothing
1962  * \retval              negative error number on failure
1963  */
1964 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1965                                         struct lfsck_component *com,
1966                                         struct lfsck_tgt_desc *ltd,
1967                                         struct lu_orphan_rec *rec,
1968                                         struct lu_fid *cfid,
1969                                         const char *infix,
1970                                         const char *type,
1971                                         __u32 ea_off)
1972 {
1973         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1974         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
1975         char                            *name   = info->lti_key;
1976         struct lu_attr                  *la     = &info->lti_la;
1977         struct dt_object_format         *dof    = &info->lti_dof;
1978         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1979         struct lu_fid                   *pfid   = &rec->lor_fid;
1980         struct lu_fid                   *tfid   = &info->lti_fid3;
1981         struct dt_device                *next   = lfsck->li_next;
1982         struct dt_object                *pobj   = NULL;
1983         struct dt_object                *cobj   = NULL;
1984         struct thandle                  *th     = NULL;
1985         struct lu_buf                    pbuf   = { 0 };
1986         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1987         struct lu_buf                    lov_buf;
1988         struct lustre_handle             lh     = { 0 };
1989         struct linkea_data               ldata  = { 0 };
1990         struct lu_buf                    linkea_buf;
1991         const struct lu_name            *pname;
1992         int                              size   = 0;
1993         int                              idx    = 0;
1994         int                              rc     = 0;
1995         ENTRY;
1996
1997         /* Create .lustre/lost+found/MDTxxxx when needed. */
1998         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1999                 rc = lfsck_create_lpf(env, lfsck);
2000                 if (rc != 0)
2001                         GOTO(log, rc);
2002         }
2003
2004         if (fid_is_zero(pfid)) {
2005                 struct filter_fid *ff = &info->lti_new_pfid;
2006
2007                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
2008                 if (rc != 0)
2009                         RETURN(rc);
2010
2011                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
2012                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
2013                 /* Currently, the filter_fid::ff_parent::f_ver is not the
2014                  * real parent MDT-object's FID::f_ver, instead it is the
2015                  * OST-object index in its parent MDT-object's layout EA. */
2016                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
2017                 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
2018                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
2019                 if (IS_ERR(cobj))
2020                         GOTO(log, rc = PTR_ERR(cobj));
2021         }
2022
2023         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
2024         if (IS_ERR(pobj))
2025                 GOTO(put, rc = PTR_ERR(pobj));
2026
2027         LASSERT(infix != NULL);
2028         LASSERT(type != NULL);
2029
2030         do {
2031                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
2032                          type, idx++);
2033                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2034                                (const struct dt_key *)name, BYPASS_CAPA);
2035                 if (rc != 0 && rc != -ENOENT)
2036                         GOTO(put, rc);
2037         } while (rc == 0);
2038
2039         rc = linkea_data_new(&ldata,
2040                              &lfsck_env_info(env)->lti_linkea_buf);
2041         if (rc != 0)
2042                 GOTO(put, rc);
2043
2044         pname = lfsck_name_get_const(env, name, strlen(name));
2045         rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
2046         if (rc != 0)
2047                 GOTO(put, rc);
2048
2049         memset(la, 0, sizeof(*la));
2050         la->la_uid = rec->lor_uid;
2051         la->la_gid = rec->lor_gid;
2052         la->la_mode = S_IFREG | S_IRUSR;
2053         la->la_valid = LA_MODE | LA_UID | LA_GID;
2054
2055         memset(dof, 0, sizeof(*dof));
2056         dof->dof_type = dt_mode_to_dft(S_IFREG);
2057
2058         size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2059         if (ea_buf->lb_len < size) {
2060                 lu_buf_realloc(ea_buf, size);
2061                 if (ea_buf->lb_buf == NULL)
2062                         GOTO(put, rc = -ENOMEM);
2063         }
2064
2065         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
2066          *
2067          * XXX: Currently, we do not grab the PDO lock as normal create cases,
2068          *      because creating MDT-object for orphan OST-object is rare, we
2069          *      do not much care about the performance. It can be improved in
2070          *      the future when needed. */
2071         rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
2072                                MDS_INODELOCK_UPDATE);
2073         if (rc != 0)
2074                 GOTO(put, rc);
2075
2076         th = dt_trans_create(env, next);
2077         if (IS_ERR(th))
2078                 GOTO(unlock, rc = PTR_ERR(th));
2079
2080         /* 1a. Update OST-object's parent information remotely.
2081          *
2082          * If other subsequent modifications failed, then next LFSCK scanning
2083          * will process the OST-object as orphan again with known parent FID. */
2084         if (cobj != NULL) {
2085                 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
2086                                           0, th);
2087                 if (rc != 0)
2088                         GOTO(stop, rc);
2089         }
2090
2091         /* 2a. Create the MDT-object locally. */
2092         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2093         if (rc != 0)
2094                 GOTO(stop, rc);
2095
2096         /* 3a. Add layout EA for the MDT-object. */
2097         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
2098         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
2099                                   LU_XATTR_CREATE, th);
2100         if (rc != 0)
2101                 GOTO(stop, rc);
2102
2103         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2104         dtrec->rec_fid = pfid;
2105         dtrec->rec_type = S_IFREG;
2106         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
2107                                (const struct dt_rec *)dtrec,
2108                                (const struct dt_key *)name, th);
2109         if (rc != 0)
2110                 GOTO(stop, rc);
2111
2112         /* 5a. insert linkEA for parent. */
2113         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2114                        ldata.ld_leh->leh_len);
2115         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
2116                                   XATTR_NAME_LINK, 0, th);
2117         if (rc != 0)
2118                 GOTO(stop, rc);
2119
2120         rc = dt_trans_start(env, next, th);
2121         if (rc != 0)
2122                 GOTO(stop, rc);
2123
2124         /* 1b. Update OST-object's parent information remotely. */
2125         if (cobj != NULL) {
2126                 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
2127                                   BYPASS_CAPA);
2128                 if (rc != 0)
2129                         GOTO(stop, rc);
2130         }
2131
2132         dt_write_lock(env, pobj, 0);
2133         /* 2b. Create the MDT-object locally. */
2134         rc = dt_create(env, pobj, la, NULL, dof, th);
2135         if (rc == 0)
2136                 /* 3b. Add layout EA for the MDT-object. */
2137                 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
2138                                                &lov_buf, LU_XATTR_CREATE,
2139                                                ltd->ltd_index, ea_off, false);
2140         dt_write_unlock(env, pobj);
2141         if (rc < 0)
2142                 GOTO(stop, rc);
2143
2144         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2145         rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
2146                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2147         if (rc != 0)
2148                 GOTO(stop, rc);
2149
2150         /* 5b. insert linkEA for parent. */
2151         rc = dt_xattr_set(env, pobj, &linkea_buf,
2152                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2153
2154         GOTO(stop, rc);
2155
2156 stop:
2157         dt_trans_stop(env, next, th);
2158
2159 unlock:
2160         lfsck_layout_unlock(&lh);
2161
2162 put:
2163         if (cobj != NULL && !IS_ERR(cobj))
2164                 lu_object_put(env, &cobj->do_lu);
2165         if (pobj != NULL && !IS_ERR(pobj))
2166                 lu_object_put(env, &pobj->do_lu);
2167
2168 log:
2169         if (rc < 0)
2170                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2171                        "recreate the lost MDT-object: parent "DFID
2172                        ", child "DFID", OST-index %u, stripe-index %u, "
2173                        "infix %s, type %s: rc = %d\n",
2174                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2175                        ltd->ltd_index, ea_off, infix, type, rc);
2176
2177         return rc >= 0 ? 1 : rc;
2178 }
2179
2180 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2181                                                    struct lfsck_component *com,
2182                                                    const struct lu_fid *fid,
2183                                                    __u32 index)
2184 {
2185         struct lfsck_thread_info *info  = lfsck_env_info(env);
2186         struct lfsck_request     *lr    = &info->lti_lr;
2187         struct lfsck_instance    *lfsck = com->lc_lfsck;
2188         struct lfsck_tgt_desc    *ltd;
2189         struct ptlrpc_request    *req;
2190         struct lfsck_request     *tmp;
2191         struct obd_export        *exp;
2192         int                       rc    = 0;
2193         ENTRY;
2194
2195         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2196         if (unlikely(ltd == NULL))
2197                 RETURN(-ENXIO);
2198
2199         exp = ltd->ltd_exp;
2200         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2201                 GOTO(put, rc = -EOPNOTSUPP);
2202
2203         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2204         if (req == NULL)
2205                 GOTO(put, rc = -ENOMEM);
2206
2207         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2208         if (rc != 0) {
2209                 ptlrpc_request_free(req);
2210
2211                 GOTO(put, rc);
2212         }
2213
2214         memset(lr, 0, sizeof(*lr));
2215         lr->lr_event = LE_CONDITIONAL_DESTROY;
2216         lr->lr_active = LFSCK_TYPE_LAYOUT;
2217         lr->lr_fid = *fid;
2218
2219         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2220         *tmp = *lr;
2221         ptlrpc_request_set_replen(req);
2222
2223         rc = ptlrpc_queue_wait(req);
2224         ptlrpc_req_finished(req);
2225
2226         GOTO(put, rc);
2227
2228 put:
2229         lfsck_tgt_put(ltd);
2230
2231         return rc;
2232 }
2233
2234 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2235                                                   struct lfsck_component *com,
2236                                                   struct lfsck_request *lr)
2237 {
2238         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2239         struct lu_attr                  *la     = &info->lti_la;
2240         ldlm_policy_data_t              *policy = &info->lti_policy;
2241         struct ldlm_res_id              *resid  = &info->lti_resid;
2242         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2243         struct dt_device                *dev    = lfsck->li_bottom;
2244         struct lu_fid                   *fid    = &lr->lr_fid;
2245         struct dt_object                *obj;
2246         struct thandle                  *th     = NULL;
2247         struct lustre_handle             lh     = { 0 };
2248         __u64                            flags  = 0;
2249         int                              rc     = 0;
2250         ENTRY;
2251
2252         obj = lfsck_object_find_by_dev(env, dev, fid);
2253         if (IS_ERR(obj))
2254                 RETURN(PTR_ERR(obj));
2255
2256         dt_read_lock(env, obj, 0);
2257         if (dt_object_exists(obj) == 0 ||
2258             lfsck_is_dead_obj(obj)) {
2259                 dt_read_unlock(env, obj);
2260
2261                 GOTO(put, rc = -ENOENT);
2262         }
2263
2264         /* Get obj's attr without lock firstly. */
2265         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2266         dt_read_unlock(env, obj);
2267         if (rc != 0)
2268                 GOTO(put, rc);
2269
2270         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2271                 GOTO(put, rc = -ETXTBSY);
2272
2273         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2274         LASSERT(lfsck->li_namespace != NULL);
2275
2276         memset(policy, 0, sizeof(*policy));
2277         policy->l_extent.end = OBD_OBJECT_EOF;
2278         ost_fid_build_resid(fid, resid);
2279         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2280                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2281                                     ldlm_completion_ast, NULL, NULL, 0,
2282                                     LVB_T_NONE, NULL, &lh);
2283         if (rc != ELDLM_OK)
2284                 GOTO(put, rc = -EIO);
2285
2286         dt_write_lock(env, obj, 0);
2287         /* Get obj's attr within lock again. */
2288         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2289         if (rc != 0)
2290                 GOTO(unlock, rc);
2291
2292         if (la->la_ctime != 0)
2293                 GOTO(unlock, rc = -ETXTBSY);
2294
2295         th = dt_trans_create(env, dev);
2296         if (IS_ERR(th))
2297                 GOTO(unlock, rc = PTR_ERR(th));
2298
2299         rc = dt_declare_ref_del(env, obj, th);
2300         if (rc != 0)
2301                 GOTO(stop, rc);
2302
2303         rc = dt_declare_destroy(env, obj, th);
2304         if (rc != 0)
2305                 GOTO(stop, rc);
2306
2307         rc = dt_trans_start_local(env, dev, th);
2308         if (rc != 0)
2309                 GOTO(stop, rc);
2310
2311         rc = dt_ref_del(env, obj, th);
2312         if (rc != 0)
2313                 GOTO(stop, rc);
2314
2315         rc = dt_destroy(env, obj, th);
2316         if (rc == 0)
2317                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2318                        "OST-object "DFID" that was created for reparing "
2319                        "dangling referenced case. But the original missed "
2320                        "OST-object is found now.\n",
2321                        lfsck_lfsck2name(lfsck), PFID(fid));
2322
2323         GOTO(stop, rc);
2324
2325 stop:
2326         dt_trans_stop(env, dev, th);
2327
2328 unlock:
2329         dt_write_unlock(env, obj);
2330         ldlm_lock_decref(&lh, LCK_EX);
2331
2332 put:
2333         lu_object_put(env, &obj->do_lu);
2334
2335         return rc;
2336 }
2337
2338 /**
2339  * Some OST-object has occupied the specified layout EA slot.
2340  * Such OST-object may be generated by the LFSCK when repair
2341  * dangling referenced MDT-object, which can be indicated by
2342  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2343  * is true and such OST-object has not been modified yet, we
2344  * will replace it with the orphan OST-object; otherwise the
2345  * LFSCK will create new MDT-object to reference the orphan.
2346  *
2347  * \retval       +1: repaired
2348  * \retval        0: did nothing
2349  * \retval      -ve: on error
2350  */
2351 static int lfsck_layout_conflict_create(const struct lu_env *env,
2352                                         struct lfsck_component *com,
2353                                         struct lfsck_tgt_desc *ltd,
2354                                         struct lu_orphan_rec *rec,
2355                                         struct dt_object *parent,
2356                                         struct lu_fid *cfid,
2357                                         struct lu_buf *ea_buf,
2358                                         struct lov_ost_data_v1 *slot,
2359                                         __u32 ea_off)
2360 {
2361         struct lfsck_thread_info *info          = lfsck_env_info(env);
2362         struct lu_fid            *cfid2         = &info->lti_fid2;
2363         struct ost_id            *oi            = &info->lti_oi;
2364         char                     *infix         = info->lti_tmpbuf;
2365         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2366         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2367         struct thandle           *th            = NULL;
2368         struct lustre_handle      lh            = { 0 };
2369         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2370         int                       rc            = 0;
2371         ENTRY;
2372
2373         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2374         rc = ostid_to_fid(cfid2, oi, ost_idx2);
2375         if (rc != 0)
2376                 GOTO(out, rc);
2377
2378         /* Hold layout lock on the parent to prevent others to access. */
2379         rc = lfsck_layout_lock(env, com, parent, &lh,
2380                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2381         if (rc != 0)
2382                 GOTO(out, rc);
2383
2384         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2385
2386         /* If the conflict OST-obejct is not created for fixing dangling
2387          * referenced MDT-object in former LFSCK check/repair, or it has
2388          * been modified by others, then we cannot destroy it. Re-create
2389          * a new MDT-object for the orphan OST-object. */
2390         if (rc == -ETXTBSY) {
2391                 /* No need the layout lock on the original parent. */
2392                 lfsck_layout_unlock(&lh);
2393
2394                 fid_zero(&rec->lor_fid);
2395                 snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2396                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2397                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2398                                                   infix, "C", ea_off);
2399
2400                 RETURN(rc);
2401         }
2402
2403         if (rc != 0 && rc != -ENOENT)
2404                 GOTO(unlock, rc);
2405
2406         th = dt_trans_create(env, dev);
2407         if (IS_ERR(th))
2408                 GOTO(unlock, rc = PTR_ERR(th));
2409
2410         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2411                                   LU_XATTR_REPLACE, th);
2412         if (rc != 0)
2413                 GOTO(stop, rc);
2414
2415         rc = dt_trans_start_local(env, dev, th);
2416         if (rc != 0)
2417                 GOTO(stop, rc);
2418
2419         dt_write_lock(env, parent, 0);
2420         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2421         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2422                                        LU_XATTR_REPLACE, ltd->ltd_index);
2423         dt_write_unlock(env, parent);
2424
2425         GOTO(stop, rc);
2426
2427 stop:
2428         dt_trans_stop(env, dev, th);
2429
2430 unlock:
2431         lfsck_layout_unlock(&lh);
2432
2433 out:
2434         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2435                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2436                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2437                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2438                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2439                ea_off, rc);
2440
2441         return rc >= 0 ? 1 : rc;
2442 }
2443
2444 /**
2445  * \retval       +1: repaired
2446  * \retval        0: did nothing
2447  * \retval      -ve: on error
2448  */
2449 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2450                                        struct lfsck_component *com,
2451                                        struct lfsck_tgt_desc *ltd,
2452                                        struct lu_orphan_rec *rec,
2453                                        struct dt_object *parent,
2454                                        struct lu_fid *cfid,
2455                                        __u32 ost_idx, __u32 ea_off)
2456 {
2457         struct lfsck_thread_info *info          = lfsck_env_info(env);
2458         struct lu_buf            *buf           = &info->lti_big_buf;
2459         struct lu_fid            *fid           = &info->lti_fid2;
2460         struct ost_id            *oi            = &info->lti_oi;
2461         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2462         struct dt_device         *dt            = lfsck->li_bottom;
2463         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2464         struct thandle            *handle       = NULL;
2465         size_t                    lovea_size;
2466         struct lov_mds_md_v1     *lmm;
2467         struct lov_ost_data_v1   *objs;
2468         struct lustre_handle      lh            = { 0 };
2469         __u32                     magic;
2470         int                       fl            = 0;
2471         int                       rc            = 0;
2472         int                       rc1;
2473         int                       i;
2474         __u16                     count;
2475         bool                      locked        = false;
2476         ENTRY;
2477
2478         rc = lfsck_layout_lock(env, com, parent, &lh,
2479                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2480         if (rc != 0) {
2481                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2482                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2483                        "stripe-index %u: rc = %d\n",
2484                        lfsck_lfsck2name(lfsck), PFID(cfid),
2485                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2486
2487                 RETURN(rc);
2488         }
2489
2490 again:
2491         if (locked) {
2492                 dt_write_unlock(env, parent);
2493                 locked = false;
2494         }
2495
2496         if (handle != NULL) {
2497                 dt_trans_stop(env, dt, handle);
2498                 handle = NULL;
2499         }
2500
2501         if (rc < 0)
2502                 GOTO(unlock_layout, rc);
2503
2504         lovea_size = rc;
2505         if (buf->lb_len < lovea_size) {
2506                 lu_buf_realloc(buf, lovea_size);
2507                 if (buf->lb_buf == NULL)
2508                         GOTO(unlock_layout, rc = -ENOMEM);
2509         }
2510
2511         if (!(bk->lb_param & LPF_DRYRUN)) {
2512                 handle = dt_trans_create(env, dt);
2513                 if (IS_ERR(handle))
2514                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2515
2516                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2517                                           fl, handle);
2518                 if (rc != 0)
2519                         GOTO(stop, rc);
2520
2521                 rc = dt_trans_start_local(env, dt, handle);
2522                 if (rc != 0)
2523                         GOTO(stop, rc);
2524         }
2525
2526         dt_write_lock(env, parent, 0);
2527         locked = true;
2528         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2529         if (rc == -ERANGE) {
2530                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2531                                   BYPASS_CAPA);
2532                 LASSERT(rc != 0);
2533                 goto again;
2534         } else if (rc == -ENODATA || rc == 0) {
2535                 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2536                 /* If the declared is not big enough, re-try. */
2537                 if (buf->lb_len < lovea_size) {
2538                         rc = lovea_size;
2539                         goto again;
2540                 }
2541                 fl = LU_XATTR_CREATE;
2542         } else if (rc < 0) {
2543                 GOTO(unlock_parent, rc);
2544         } else if (unlikely(buf->lb_len == 0)) {
2545                 goto again;
2546         } else {
2547                 fl = LU_XATTR_REPLACE;
2548                 lovea_size = rc;
2549         }
2550
2551         if (fl == LU_XATTR_CREATE) {
2552                 if (bk->lb_param & LPF_DRYRUN)
2553                         GOTO(unlock_parent, rc = 1);
2554
2555                 LASSERT(buf->lb_len >= lovea_size);
2556
2557                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2558                                                buf, fl, ost_idx, ea_off, false);
2559
2560                 GOTO(unlock_parent, rc);
2561         }
2562
2563         lmm = buf->lb_buf;
2564         rc1 = lfsck_layout_verify_header(lmm);
2565
2566         /* If the LOV EA crashed, the rebuild it. */
2567         if (rc1 == -EINVAL) {
2568                 if (bk->lb_param & LPF_DRYRUN)
2569                         GOTO(unlock_parent, rc = 1);
2570
2571                 LASSERT(buf->lb_len >= lovea_size);
2572
2573                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2574                                                buf, fl, ost_idx, ea_off, true);
2575
2576                 GOTO(unlock_parent, rc);
2577         }
2578
2579         /* For other unknown magic/pattern, keep the current LOV EA. */
2580         if (rc1 != 0)
2581                 GOTO(unlock_parent, rc = rc1);
2582
2583         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2584          * been verified in lfsck_layout_verify_header() already. If some
2585          * new magic introduced in the future, then layout LFSCK needs to
2586          * be updated also. */
2587         magic = le32_to_cpu(lmm->lmm_magic);
2588         if (magic == LOV_MAGIC_V1) {
2589                 objs = &lmm->lmm_objects[0];
2590         } else {
2591                 LASSERT(magic == LOV_MAGIC_V3);
2592                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2593         }
2594
2595         count = le16_to_cpu(lmm->lmm_stripe_count);
2596         if (count == 0)
2597                 GOTO(unlock_parent, rc = -EINVAL);
2598         LASSERT(count > 0);
2599
2600         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2601         if (count <= ea_off) {
2602                 if (bk->lb_param & LPF_DRYRUN)
2603                         GOTO(unlock_parent, rc = 1);
2604
2605                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2606                 /* If the declared is not big enough, re-try. */
2607                 if (buf->lb_len < lovea_size) {
2608                         rc = lovea_size;
2609                         goto again;
2610                 }
2611
2612                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2613                                                buf, fl, ost_idx, ea_off, false);
2614
2615                 GOTO(unlock_parent, rc);
2616         }
2617
2618         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2619
2620         for (i = 0; i < count; i++, objs++) {
2621                 /* The MDT-object was created via lfsck_layout_recover_create()
2622                  * by others before, and we fill the dummy layout EA. */
2623                 if (lovea_slot_is_dummy(objs)) {
2624                         if (i != ea_off)
2625                                 continue;
2626
2627                         if (bk->lb_param & LPF_DRYRUN)
2628                                 GOTO(unlock_parent, rc = 1);
2629
2630                         lmm->lmm_layout_gen =
2631                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2632                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2633                                                        cfid, buf, objs, fl,
2634                                                        ost_idx);
2635
2636                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2637                                "dummy layout slot for "DFID": parent "DFID
2638                                ", OST-index %u, stripe-index %u: rc = %d\n",
2639                                lfsck_lfsck2name(lfsck), PFID(cfid),
2640                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2641
2642                         GOTO(unlock_parent, rc);
2643                 }
2644
2645                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2646                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2647                 if (rc != 0) {
2648                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2649                                "invalid layout EA at the slot %d, index %u\n",
2650                                lfsck_lfsck2name(lfsck),
2651                                PFID(lfsck_dto2fid(parent)), i,
2652                                le32_to_cpu(objs->l_ost_idx));
2653
2654                         GOTO(unlock_parent, rc);
2655                 }
2656
2657                 /* It should be rare case, the slot is there, but the LFSCK
2658                  * does not handle it during the first-phase cycle scanning. */
2659                 if (unlikely(lu_fid_eq(fid, cfid))) {
2660                         if (i == ea_off) {
2661                                 GOTO(unlock_parent, rc = 0);
2662                         } else {
2663                                 /* Rare case that the OST-object index
2664                                  * does not match the parent MDT-object
2665                                  * layout EA. We trust the later one. */
2666                                 if (bk->lb_param & LPF_DRYRUN)
2667                                         GOTO(unlock_parent, rc = 1);
2668
2669                                 dt_write_unlock(env, parent);
2670                                 if (handle != NULL)
2671                                         dt_trans_stop(env, dt, handle);
2672                                 lfsck_layout_unlock(&lh);
2673                                 rc = lfsck_layout_update_pfid(env, com, parent,
2674                                                         cfid, ltd->ltd_tgt, i);
2675
2676                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2677                                        "updated OST-object's pfid for "DFID
2678                                        ": parent "DFID", OST-index %u, "
2679                                        "stripe-index %u: rc = %d\n",
2680                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2681                                        PFID(lfsck_dto2fid(parent)),
2682                                        ltd->ltd_index, i, rc);
2683
2684                                 RETURN(rc);
2685                         }
2686                 }
2687         }
2688
2689         /* The MDT-object exists, but related layout EA slot is occupied
2690          * by others. */
2691         if (bk->lb_param & LPF_DRYRUN)
2692                 GOTO(unlock_parent, rc = 1);
2693
2694         dt_write_unlock(env, parent);
2695         if (handle != NULL)
2696                 dt_trans_stop(env, dt, handle);
2697         lfsck_layout_unlock(&lh);
2698         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2699                 objs = &lmm->lmm_objects[ea_off];
2700         else
2701                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2702         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2703                                           buf, objs, ea_off);
2704
2705         RETURN(rc);
2706
2707 unlock_parent:
2708         if (locked)
2709                 dt_write_unlock(env, parent);
2710
2711 stop:
2712         if (handle != NULL)
2713                 dt_trans_stop(env, dt, handle);
2714
2715 unlock_layout:
2716         lfsck_layout_unlock(&lh);
2717
2718         return rc;
2719 }
2720
2721 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2722                                         struct lfsck_component *com,
2723                                         struct lfsck_tgt_desc *ltd,
2724                                         struct lu_orphan_rec *rec,
2725                                         struct lu_fid *cfid)
2726 {
2727         struct lfsck_layout     *lo     = com->lc_file_ram;
2728         struct lu_fid           *pfid   = &rec->lor_fid;
2729         struct dt_object        *parent = NULL;
2730         __u32                    ea_off = pfid->f_stripe_idx;
2731         int                      rc     = 0;
2732         ENTRY;
2733
2734         if (!fid_is_sane(cfid))
2735                 GOTO(out, rc = -EINVAL);
2736
2737         if (fid_is_zero(pfid)) {
2738                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2739                                                   "", "N", ea_off);
2740                 GOTO(out, rc);
2741         }
2742
2743         pfid->f_ver = 0;
2744         if (!fid_is_sane(pfid))
2745                 GOTO(out, rc = -EINVAL);
2746
2747         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2748         if (IS_ERR(parent))
2749                 GOTO(out, rc = PTR_ERR(parent));
2750
2751         if (unlikely(dt_object_remote(parent) != 0))
2752                 GOTO(put, rc = -EXDEV);
2753
2754         if (dt_object_exists(parent) == 0) {
2755                 lu_object_put(env, &parent->do_lu);
2756                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2757                                                   "", "R", ea_off);
2758                 GOTO(out, rc);
2759         }
2760
2761         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2762                 GOTO(put, rc = -EISDIR);
2763
2764         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2765                                          ltd->ltd_index, ea_off);
2766
2767         GOTO(put, rc);
2768
2769 put:
2770         if (rc <= 0)
2771                 lu_object_put(env, &parent->do_lu);
2772         else
2773                 /* The layout EA is changed, need to be reloaded next time. */
2774                 lu_object_put_nocache(env, &parent->do_lu);
2775
2776 out:
2777         down_write(&com->lc_sem);
2778         com->lc_new_scanned++;
2779         com->lc_new_checked++;
2780         if (rc > 0) {
2781                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2782                 rc = 0;
2783         } else if (rc < 0) {
2784                 lo->ll_objs_failed_phase2++;
2785         }
2786         up_write(&com->lc_sem);
2787
2788         return rc;
2789 }
2790
2791 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2792                                     struct lfsck_component *com,
2793                                     struct lfsck_tgt_desc *ltd)
2794 {
2795         struct lfsck_layout             *lo     = com->lc_file_ram;
2796         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2797         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2798         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2799         struct ost_id                   *oi     = &info->lti_oi;
2800         struct lu_fid                   *fid    = &info->lti_fid;
2801         struct dt_object                *obj;
2802         const struct dt_it_ops          *iops;
2803         struct dt_it                    *di;
2804         int                              rc     = 0;
2805         ENTRY;
2806
2807         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2808                "scanning for OST%04x\n",
2809                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2810
2811         ostid_set_seq(oi, FID_SEQ_IDIF);
2812         ostid_set_id(oi, 0);
2813         rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2814         if (rc != 0)
2815                 GOTO(log, rc);
2816
2817         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2818         if (unlikely(IS_ERR(obj)))
2819                 GOTO(log, rc = PTR_ERR(obj));
2820
2821         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2822         if (rc != 0)
2823                 GOTO(put, rc);
2824
2825         iops = &obj->do_index_ops->dio_it;
2826         di = iops->init(env, obj, 0, BYPASS_CAPA);
2827         if (IS_ERR(di))
2828                 GOTO(put, rc = PTR_ERR(di));
2829
2830         rc = iops->load(env, di, 0);
2831         if (rc == -ESRCH) {
2832                 /* -ESRCH means that the orphan OST-objects rbtree has been
2833                  * cleanup because of the OSS server restart or other errors. */
2834                 lo->ll_flags |= LF_INCOMPLETE;
2835                 GOTO(fini, rc);
2836         }
2837
2838         if (rc == 0)
2839                 rc = iops->next(env, di);
2840         else if (rc > 0)
2841                 rc = 0;
2842
2843         if (rc < 0)
2844                 GOTO(fini, rc);
2845
2846         if (rc > 0)
2847                 GOTO(fini, rc = 0);
2848
2849         do {
2850                 struct dt_key           *key;
2851                 struct lu_orphan_rec    *rec = &info->lti_rec;
2852
2853                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2854                     cfs_fail_val > 0) {
2855                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2856                         struct l_wait_info       lwi;
2857
2858                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2859                                           NULL, NULL);
2860                         l_wait_event(thread->t_ctl_waitq,
2861                                      !thread_is_running(thread),
2862                                      &lwi);
2863                 }
2864
2865                 key = iops->key(env, di);
2866                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2867                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2868                 if (rc == 0)
2869                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2870                                         &com->lc_fid_latest_scanned_phase2);
2871                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2872                         GOTO(fini, rc);
2873
2874                 lfsck_control_speed_by_self(com);
2875                 do {
2876                         rc = iops->next(env, di);
2877                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2878         } while (rc == 0);
2879
2880         GOTO(fini, rc);
2881
2882 fini:
2883         iops->put(env, di);
2884         iops->fini(env, di);
2885 put:
2886         lu_object_put(env, &obj->do_lu);
2887
2888 log:
2889         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2890                "scanning for OST%04x: rc = %d\n",
2891                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2892
2893         return rc > 0 ? 0 : rc;
2894 }
2895
2896 /* For the MDT-object with dangling reference, we need to repare the
2897  * inconsistency according to the LFSCK sponsor's requirement:
2898  *
2899  * 1) Keep the inconsistency there and report the inconsistency case,
2900  *    then give the chance to the application to find related issues,
2901  *    and the users can make the decision about how to handle it with
2902  *    more human knownledge. (by default)
2903  *
2904  * 2) Re-create the missed OST-object with the FID/owner information. */
2905 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2906                                         struct lfsck_component *com,
2907                                         struct lfsck_layout_req *llr,
2908                                         const struct lu_attr *pla)
2909 {
2910         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2911         struct filter_fid               *pfid   = &info->lti_new_pfid;
2912         struct dt_allocation_hint       *hint   = &info->lti_hint;
2913         struct lu_attr                  *cla    = &info->lti_la2;
2914         struct dt_object                *parent = llr->llr_parent->llo_obj;
2915         struct dt_object                *child  = llr->llr_child;
2916         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2917         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2918         struct thandle                  *handle;
2919         struct lu_buf                   *buf;
2920         struct lustre_handle             lh     = { 0 };
2921         int                              rc;
2922         bool                             create;
2923         ENTRY;
2924
2925         if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2926                 create = true;
2927         else
2928                 create = false;
2929
2930         if (!create)
2931                 GOTO(log, rc = 1);
2932
2933         memset(cla, 0, sizeof(*cla));
2934         cla->la_uid = pla->la_uid;
2935         cla->la_gid = pla->la_gid;
2936         cla->la_mode = S_IFREG | 0666;
2937         cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2938                         LA_ATIME | LA_MTIME | LA_CTIME;
2939
2940         rc = lfsck_layout_lock(env, com, parent, &lh,
2941                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2942         if (rc != 0)
2943                 GOTO(log, rc);
2944
2945         handle = dt_trans_create(env, dev);
2946         if (IS_ERR(handle))
2947                 GOTO(unlock1, rc = PTR_ERR(handle));
2948
2949         hint->dah_parent = NULL;
2950         hint->dah_mode = 0;
2951         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2952         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2953         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2954          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2955          * parent MDT-object's layout EA. */
2956         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2957         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2958
2959         rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2960         if (rc != 0)
2961                 GOTO(stop, rc);
2962
2963         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2964                                   LU_XATTR_CREATE, handle);
2965         if (rc != 0)
2966                 GOTO(stop, rc);
2967
2968         rc = dt_trans_start(env, dev, handle);
2969         if (rc != 0)
2970                 GOTO(stop, rc);
2971
2972         dt_read_lock(env, parent, 0);
2973         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2974                 GOTO(unlock2, rc = 1);
2975
2976         rc = dt_create(env, child, cla, hint, NULL, handle);
2977         if (rc != 0)
2978                 GOTO(unlock2, rc);
2979
2980         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2981                           handle, BYPASS_CAPA);
2982
2983         GOTO(unlock2, rc);
2984
2985 unlock2:
2986         dt_read_unlock(env, parent);
2987
2988 stop:
2989         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2990
2991 unlock1:
2992         lfsck_layout_unlock(&lh);
2993
2994 log:
2995         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2996                "reference for: parent "DFID", child "DFID", OST-index %u, "
2997                "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2998                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2999                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
3000                llr->llr_lov_idx, pla->la_uid, pla->la_gid,
3001                create ? "Create the lost OST-object as required" :
3002                         "Keep the MDT-object there by default", rc);
3003
3004         return rc;
3005 }
3006
3007 /* If the OST-object does not recognize the MDT-object as its parent, and
3008  * there is no other MDT-object claims as its parent, then just trust the
3009  * given MDT-object as its parent. So update the OST-object filter_fid. */
3010 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
3011                                               struct lfsck_component *com,
3012                                               struct lfsck_layout_req *llr,
3013                                               const struct lu_attr *pla)
3014 {
3015         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3016         struct filter_fid               *pfid   = &info->lti_new_pfid;
3017         struct lu_attr                  *tla    = &info->lti_la3;
3018         struct dt_object                *parent = llr->llr_parent->llo_obj;
3019         struct dt_object                *child  = llr->llr_child;
3020         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3021         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
3022         struct thandle                  *handle;
3023         struct lu_buf                   *buf;
3024         struct lustre_handle             lh     = { 0 };
3025         int                              rc;
3026         ENTRY;
3027
3028         rc = lfsck_layout_lock(env, com, parent, &lh,
3029                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3030         if (rc != 0)
3031                 GOTO(log, rc);
3032
3033         handle = dt_trans_create(env, dev);
3034         if (IS_ERR(handle))
3035                 GOTO(unlock1, rc = PTR_ERR(handle));
3036
3037         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
3038         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
3039         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
3040          * MDT-object's FID::f_ver, instead it is the OST-object index in its
3041          * parent MDT-object's layout EA. */
3042         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
3043         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
3044
3045         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
3046         if (rc != 0)
3047                 GOTO(stop, rc);
3048
3049         tla->la_valid = LA_UID | LA_GID;
3050         tla->la_uid = pla->la_uid;
3051         tla->la_gid = pla->la_gid;
3052         rc = dt_declare_attr_set(env, child, tla, handle);
3053         if (rc != 0)
3054                 GOTO(stop, rc);
3055
3056         rc = dt_trans_start(env, dev, handle);
3057         if (rc != 0)
3058                 GOTO(stop, rc);
3059
3060         dt_write_lock(env, parent, 0);
3061         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3062                 GOTO(unlock2, rc = 1);
3063
3064         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
3065                           BYPASS_CAPA);
3066         if (rc != 0)
3067                 GOTO(unlock2, rc);
3068
3069         /* Get the latest parent's owner. */
3070         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3071         if (rc != 0)
3072                 GOTO(unlock2, rc);
3073
3074         tla->la_valid = LA_UID | LA_GID;
3075         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3076
3077         GOTO(unlock2, rc);
3078
3079 unlock2:
3080         dt_write_unlock(env, parent);
3081
3082 stop:
3083         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3084
3085 unlock1:
3086         lfsck_layout_unlock(&lh);
3087
3088 log:
3089         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
3090                "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
3091                "stripe-index %u, owner %u/%u: rc = %d\n",
3092                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3093                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3094                pla->la_uid, pla->la_gid, rc);
3095
3096         return rc;
3097 }
3098
3099 /* If there are more than one MDT-objects claim as the OST-object's parent,
3100  * and the OST-object only recognizes one of them, then we need to generate
3101  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
3102 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
3103                                                    struct lfsck_component *com,
3104                                                    struct lfsck_layout_req *llr,
3105                                                    struct lu_attr *la,
3106                                                    struct lu_buf *buf)
3107 {
3108         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3109         struct dt_allocation_hint       *hint   = &info->lti_hint;
3110         struct dt_object_format         *dof    = &info->lti_dof;
3111         struct dt_device                *pdev   = com->lc_lfsck->li_next;
3112         struct ost_id                   *oi     = &info->lti_oi;
3113         struct dt_object                *parent = llr->llr_parent->llo_obj;
3114         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
3115         struct dt_object                *child  = NULL;
3116         struct lu_device                *d      = &cdev->dd_lu_dev;
3117         struct lu_object                *o      = NULL;
3118         struct thandle                  *handle;
3119         struct lov_mds_md_v1            *lmm;
3120         struct lov_ost_data_v1          *objs;
3121         struct lustre_handle             lh     = { 0 };
3122         struct lu_buf                    ea_buf;
3123         __u32                            magic;
3124         int                              rc;
3125         ENTRY;
3126
3127         rc = lfsck_layout_lock(env, com, parent, &lh,
3128                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3129         if (rc != 0)
3130                 GOTO(log, rc);
3131
3132         handle = dt_trans_create(env, pdev);
3133         if (IS_ERR(handle))
3134                 GOTO(unlock1, rc = PTR_ERR(handle));
3135
3136         o = lu_object_anon(env, d, NULL);
3137         if (IS_ERR(o))
3138                 GOTO(stop, rc = PTR_ERR(o));
3139
3140         child = container_of(o, struct dt_object, do_lu);
3141         o = lu_object_locate(o->lo_header, d->ld_type);
3142         if (unlikely(o == NULL))
3143                 GOTO(stop, rc = -EINVAL);
3144
3145         child = container_of(o, struct dt_object, do_lu);
3146         la->la_valid = LA_UID | LA_GID;
3147         hint->dah_parent = NULL;
3148         hint->dah_mode = 0;
3149         dof->dof_type = DFT_REGULAR;
3150         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
3151         if (rc != 0)
3152                 GOTO(stop, rc);
3153
3154         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
3155                                   LU_XATTR_REPLACE, handle);
3156         if (rc != 0)
3157                 GOTO(stop, rc);
3158
3159         rc = dt_trans_start(env, pdev, handle);
3160         if (rc != 0)
3161                 GOTO(stop, rc);
3162
3163         dt_write_lock(env, parent, 0);
3164         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3165                 GOTO(unlock2, rc = 0);
3166
3167         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
3168         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
3169                 GOTO(unlock2, rc = 0);
3170
3171         lmm = buf->lb_buf;
3172         /* Someone change layout during the LFSCK, no need to repair then. */
3173         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
3174                 GOTO(unlock2, rc = 0);
3175
3176         rc = dt_create(env, child, la, hint, dof, handle);
3177         if (rc != 0)
3178                 GOTO(unlock2, rc);
3179
3180         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3181          * been verified in lfsck_layout_verify_header() already. If some
3182          * new magic introduced in the future, then layout LFSCK needs to
3183          * be updated also. */
3184         magic = le32_to_cpu(lmm->lmm_magic);
3185         if (magic == LOV_MAGIC_V1) {
3186                 objs = &lmm->lmm_objects[0];
3187         } else {
3188                 LASSERT(magic == LOV_MAGIC_V3);
3189                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3190         }
3191
3192         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
3193         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
3194         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
3195         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
3196         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
3197         lfsck_buf_init(&ea_buf, lmm,
3198                        lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
3199                                        magic));
3200         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
3201                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
3202
3203         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
3204
3205 unlock2:
3206         dt_write_unlock(env, parent);
3207
3208 stop:
3209         if (child != NULL)
3210                 lu_object_put(env, &child->do_lu);
3211
3212         dt_trans_stop(env, pdev, handle);
3213
3214 unlock1:
3215         lfsck_layout_unlock(&lh);
3216
3217 log:
3218         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
3219                "references for: parent "DFID", OST-index %u, stripe-index %u, "
3220                "owner %u/%u: rc = %d\n",
3221                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3222                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
3223
3224         return rc;
3225 }
3226
3227 /* If the MDT-object and the OST-object have different owner information,
3228  * then trust the MDT-object, because the normal chown/chgrp handle order
3229  * is from MDT to OST, and it is possible that some chown/chgrp operation
3230  * is partly done. */
3231 static int lfsck_layout_repair_owner(const struct lu_env *env,
3232                                      struct lfsck_component *com,
3233                                      struct lfsck_layout_req *llr,
3234                                      struct lu_attr *pla)
3235 {
3236         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3237         struct lu_attr                  *tla    = &info->lti_la3;
3238         struct dt_object                *parent = llr->llr_parent->llo_obj;
3239         struct dt_object                *child  = llr->llr_child;
3240         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3241         struct thandle                  *handle;
3242         int                              rc;
3243         ENTRY;
3244
3245         handle = dt_trans_create(env, dev);
3246         if (IS_ERR(handle))
3247                 GOTO(log, rc = PTR_ERR(handle));
3248
3249         tla->la_uid = pla->la_uid;
3250         tla->la_gid = pla->la_gid;
3251         tla->la_valid = LA_UID | LA_GID;
3252         rc = dt_declare_attr_set(env, child, tla, handle);
3253         if (rc != 0)
3254                 GOTO(stop, rc);
3255
3256         rc = dt_trans_start(env, dev, handle);
3257         if (rc != 0)
3258                 GOTO(stop, rc);
3259
3260         /* Use the dt_object lock to serialize with destroy and attr_set. */
3261         dt_read_lock(env, parent, 0);
3262         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3263                 GOTO(unlock, rc = 1);
3264
3265         /* Get the latest parent's owner. */
3266         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3267         if (rc != 0)
3268                 GOTO(unlock, rc);
3269
3270         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3271         if (unlikely(tla->la_uid != pla->la_uid ||
3272                      tla->la_gid != pla->la_gid))
3273                 GOTO(unlock, rc = 1);
3274
3275         tla->la_valid = LA_UID | LA_GID;
3276         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3277
3278         GOTO(unlock, rc);
3279
3280 unlock:
3281         dt_read_unlock(env, parent);
3282
3283 stop:
3284         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3285
3286 log:
3287         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
3288                "file owner for: parent "DFID", child "DFID", OST-index %u, "
3289                "stripe-index %u, owner %u/%u: rc = %d\n",
3290                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3291                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3292                pla->la_uid, pla->la_gid, rc);
3293
3294         return rc;
3295 }
3296
3297 /* Check whether the OST-object correctly back points to the
3298  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3299 static int lfsck_layout_check_parent(const struct lu_env *env,
3300                                      struct lfsck_component *com,
3301                                      struct dt_object *parent,
3302                                      const struct lu_fid *pfid,
3303                                      const struct lu_fid *cfid,
3304                                      const struct lu_attr *pla,
3305                                      const struct lu_attr *cla,
3306                                      struct lfsck_layout_req *llr,
3307                                      struct lu_buf *lov_ea, __u32 idx)
3308 {
3309         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3310         struct lu_buf                   *buf    = &info->lti_big_buf;
3311         struct dt_object                *tobj;
3312         struct lov_mds_md_v1            *lmm;
3313         struct lov_ost_data_v1          *objs;
3314         int                              rc;
3315         int                              i;
3316         __u32                            magic;
3317         __u16                            count;
3318         ENTRY;
3319
3320         if (fid_is_zero(pfid)) {
3321                 /* client never wrote. */
3322                 if (cla->la_size == 0 && cla->la_blocks == 0) {
3323                         if (unlikely(cla->la_uid != pla->la_uid ||
3324                                      cla->la_gid != pla->la_gid))
3325                                 RETURN (LLIT_INCONSISTENT_OWNER);
3326
3327                         RETURN(0);
3328                 }
3329
3330                 RETURN(LLIT_UNMATCHED_PAIR);
3331         }
3332
3333         if (unlikely(!fid_is_sane(pfid)))
3334                 RETURN(LLIT_UNMATCHED_PAIR);
3335
3336         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3337                 if (llr->llr_lov_idx == idx)
3338                         RETURN(0);
3339
3340                 RETURN(LLIT_UNMATCHED_PAIR);
3341         }
3342
3343         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3344         if (IS_ERR(tobj))
3345                 RETURN(PTR_ERR(tobj));
3346
3347         dt_read_lock(env, tobj, 0);
3348         if (dt_object_exists(tobj) == 0 ||
3349             lfsck_is_dead_obj(tobj))
3350                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3351
3352         if (!S_ISREG(lfsck_object_type(tobj)))
3353                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3354
3355         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3356          * remote one on another MDT. Then check whether the given OST-object
3357          * is in such layout. If yes, it is multiple referenced, otherwise it
3358          * is unmatched referenced case. */
3359         rc = lfsck_layout_get_lovea(env, tobj, buf);
3360         if (rc == 0 || rc == -ENOENT)
3361                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3362
3363         if (rc < 0)
3364                 GOTO(out, rc);
3365
3366         lmm = buf->lb_buf;
3367         magic = le32_to_cpu(lmm->lmm_magic);
3368         if (magic == LOV_MAGIC_V1) {
3369                 objs = &lmm->lmm_objects[0];
3370         } else {
3371                 LASSERT(magic == LOV_MAGIC_V3);
3372                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3373         }
3374
3375         count = le16_to_cpu(lmm->lmm_stripe_count);
3376         for (i = 0; i < count; i++, objs++) {
3377                 struct lu_fid           *tfid   = &info->lti_fid2;
3378                 struct ost_id           *oi     = &info->lti_oi;
3379                 __u32                    idx2;
3380
3381                 if (lovea_slot_is_dummy(objs))
3382                         continue;
3383
3384                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3385                 idx2 = le32_to_cpu(objs->l_ost_idx);
3386                 rc = ostid_to_fid(tfid, oi, idx2);
3387                 if (rc != 0) {
3388                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
3389                                "invalid layout EA at the slot %d, index %u\n",
3390                                lfsck_lfsck2name(com->lc_lfsck),
3391                                PFID(pfid), i, idx2);
3392
3393                         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3394                 }
3395
3396                 if (lu_fid_eq(cfid, tfid)) {
3397                         *lov_ea = *buf;
3398
3399                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
3400                 }
3401         }
3402
3403         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3404
3405 out:
3406         dt_read_unlock(env, tobj);
3407         lfsck_object_put(env, tobj);
3408
3409         return rc;
3410 }
3411
3412 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
3413                                              struct lfsck_component *com,
3414                                              struct lfsck_layout_req *llr)
3415 {
3416         struct lfsck_layout                  *lo     = com->lc_file_ram;
3417         struct lfsck_thread_info             *info   = lfsck_env_info(env);
3418         struct filter_fid_old                *pea    = &info->lti_old_pfid;
3419         struct lu_fid                        *pfid   = &info->lti_fid;
3420         struct lu_buf                         buf    = { 0 };
3421         struct dt_object                     *parent = llr->llr_parent->llo_obj;
3422         struct dt_object                     *child  = llr->llr_child;
3423         struct lu_attr                       *pla    = &info->lti_la;
3424         struct lu_attr                       *cla    = &info->lti_la2;
3425         struct lfsck_instance                *lfsck  = com->lc_lfsck;
3426         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
3427         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
3428         __u32                                 idx    = 0;
3429         int                                   rc;
3430         ENTRY;
3431
3432         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3433         if (rc != 0) {
3434                 if (lu_object_is_dying(parent->do_lu.lo_header))
3435                         RETURN(0);
3436
3437                 GOTO(out, rc);
3438         }
3439
3440         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3441         if (rc == -ENOENT) {
3442                 if (lu_object_is_dying(parent->do_lu.lo_header))
3443                         RETURN(0);
3444
3445                 type = LLIT_DANGLING;
3446                 goto repair;
3447         }
3448
3449         if (rc != 0)
3450                 GOTO(out, rc);
3451
3452         lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
3453         rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
3454         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3455                      rc != sizeof(struct filter_fid))) {
3456                 type = LLIT_UNMATCHED_PAIR;
3457                 goto repair;
3458         }
3459
3460         if (rc < 0 && rc != -ENODATA)
3461                 GOTO(out, rc);
3462
3463         if (rc == -ENODATA) {
3464                 fid_zero(pfid);
3465         } else {
3466                 fid_le_to_cpu(pfid, &pea->ff_parent);
3467                 /* Currently, the filter_fid::ff_parent::f_ver is not the
3468                  * real parent MDT-object's FID::f_ver, instead it is the
3469                  * OST-object index in its parent MDT-object's layout EA. */
3470                 idx = pfid->f_stripe_idx;
3471                 pfid->f_ver = 0;
3472         }
3473
3474         rc = lfsck_layout_check_parent(env, com, parent, pfid,
3475                                        lu_object_fid(&child->do_lu),
3476                                        pla, cla, llr, &buf, idx);
3477         if (rc > 0) {
3478                 type = rc;
3479                 goto repair;
3480         }
3481
3482         if (rc < 0)
3483                 GOTO(out, rc);
3484
3485         if (unlikely(cla->la_uid != pla->la_uid ||
3486                      cla->la_gid != pla->la_gid)) {
3487                 type = LLIT_INCONSISTENT_OWNER;
3488                 goto repair;
3489         }
3490
3491 repair:
3492         if (bk->lb_param & LPF_DRYRUN) {
3493                 if (type != LLIT_NONE)
3494                         GOTO(out, rc = 1);
3495                 else
3496                         GOTO(out, rc = 0);
3497         }
3498
3499         switch (type) {
3500         case LLIT_DANGLING:
3501                 rc = lfsck_layout_repair_dangling(env, com, llr, pla);
3502                 break;
3503         case LLIT_UNMATCHED_PAIR:
3504                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3505                 break;
3506         case LLIT_MULTIPLE_REFERENCED:
3507                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3508                                                              pla, &buf);
3509                 break;
3510         case LLIT_INCONSISTENT_OWNER:
3511                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3512                 break;
3513         default:
3514                 rc = 0;
3515                 break;
3516         }
3517
3518         GOTO(out, rc);
3519
3520 out:
3521         down_write(&com->lc_sem);
3522         if (rc < 0) {
3523                 struct lfsck_layout_master_data *llmd = com->lc_data;
3524
3525                 if (unlikely(llmd->llmd_exit)) {
3526                         rc = 0;
3527                 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3528                            rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3529                            rc == -EHOSTUNREACH) {
3530                         /* If cannot touch the target server,
3531                          * mark the LFSCK as INCOMPLETE. */
3532                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3533                                "talk with OST %x: rc = %d\n",
3534                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3535                         lo->ll_flags |= LF_INCOMPLETE;
3536                         lo->ll_objs_skipped++;
3537                         rc = 0;
3538                 } else {
3539                         lfsck_layout_record_failure(env, lfsck, lo);
3540                 }
3541         } else if (rc > 0) {
3542                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3543                          "unknown type = %d\n", type);
3544
3545                 lo->ll_objs_repaired[type - 1]++;
3546                 if (bk->lb_param & LPF_DRYRUN &&
3547                     unlikely(lo->ll_pos_first_inconsistent == 0))
3548                         lo->ll_pos_first_inconsistent =
3549                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3550                                                         lfsck->li_di_oit);
3551         }
3552         up_write(&com->lc_sem);
3553
3554         return rc;
3555 }
3556
3557 static int lfsck_layout_assistant(void *args)
3558 {
3559         struct lfsck_thread_args        *lta     = args;
3560         struct lu_env                   *env     = &lta->lta_env;
3561         struct lfsck_component          *com     = lta->lta_com;
3562         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
3563         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3564         struct lfsck_position           *pos     = &com->lc_pos_start;
3565         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3566         struct lfsck_request            *lr      = &info->lti_lr;
3567         struct lfsck_layout_master_data *llmd    = com->lc_data;
3568         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3569         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3570         struct lfsck_layout_req         *llr;
3571         struct l_wait_info               lwi     = { 0 };
3572         int                              rc      = 0;
3573         int                              rc1     = 0;
3574         ENTRY;
3575
3576         memset(lr, 0, sizeof(*lr));
3577         lr->lr_event = LE_START;
3578         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3579                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
3580         lr->lr_speed = bk->lb_speed_limit;
3581         lr->lr_version = bk->lb_version;
3582         lr->lr_param = bk->lb_param;
3583         lr->lr_async_windows = bk->lb_async_windows;
3584         lr->lr_flags = LEF_TO_OST;
3585         if (pos->lp_oit_cookie <= 1)
3586                 lr->lr_param |= LPF_RESET;
3587
3588         rc = lfsck_layout_master_notify_others(env, com, lr);
3589         if (rc != 0) {
3590                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to notify "
3591                        "others for LFSCK start: rc = %d\n",
3592                        lfsck_lfsck2name(lfsck), rc);
3593                 GOTO(fini, rc);
3594         }
3595
3596         spin_lock(&llmd->llmd_lock);
3597         thread_set_flags(athread, SVC_RUNNING);
3598         spin_unlock(&llmd->llmd_lock);
3599         wake_up_all(&mthread->t_ctl_waitq);
3600
3601         while (1) {
3602                 while (!list_empty(&llmd->llmd_req_list)) {
3603                         bool wakeup = false;
3604
3605                         if (unlikely(llmd->llmd_exit ||
3606                                      !thread_is_running(mthread)))
3607                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
3608
3609                         llr = list_entry(llmd->llmd_req_list.next,
3610                                          struct lfsck_layout_req,
3611                                          llr_list);
3612                         /* Only the lfsck_layout_assistant thread itself can
3613                          * remove the "llr" from the head of the list, LFSCK
3614                          * engine thread only inserts other new "lld" at the
3615                          * end of the list. So it is safe to handle current
3616                          * "llr" without the spin_lock. */
3617                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
3618                         spin_lock(&llmd->llmd_lock);
3619                         list_del_init(&llr->llr_list);
3620                         llmd->llmd_prefetched--;
3621                         /* Wake up the main engine thread only when the list
3622                          * is empty or half of the prefetched items have been
3623                          * handled to avoid too frequent thread schedule. */
3624                         if (llmd->llmd_prefetched == 0 ||
3625                             (bk->lb_async_windows != 0 &&
3626                              bk->lb_async_windows / 2 ==
3627                              llmd->llmd_prefetched))
3628                                 wakeup = true;
3629                         spin_unlock(&llmd->llmd_lock);
3630                         if (wakeup)
3631                                 wake_up_all(&mthread->t_ctl_waitq);
3632
3633                         lfsck_layout_req_fini(env, llr);
3634                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3635                                 GOTO(cleanup1, rc);
3636                 }
3637
3638                 l_wait_event(athread->t_ctl_waitq,
3639                              !lfsck_layout_req_empty(llmd) ||
3640                              llmd->llmd_exit ||
3641                              llmd->llmd_to_post ||
3642                              llmd->llmd_to_double_scan,
3643                              &lwi);
3644
3645                 if (unlikely(llmd->llmd_exit))
3646                         GOTO(cleanup1, rc = llmd->llmd_post_result);
3647
3648                 if (!list_empty(&llmd->llmd_req_list))
3649                         continue;
3650
3651                 if (llmd->llmd_to_post) {
3652                         llmd->llmd_to_post = 0;
3653                         LASSERT(llmd->llmd_post_result > 0);
3654
3655                         memset(lr, 0, sizeof(*lr));
3656                         lr->lr_event = LE_PHASE1_DONE;
3657                         lr->lr_status = llmd->llmd_post_result;
3658                         rc = lfsck_layout_master_notify_others(env, com, lr);
3659                         if (rc != 0)
3660                                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant "
3661                                        "failed to notify others for LFSCK "
3662                                        "post: rc = %d\n",
3663                                        lfsck_lfsck2name(lfsck), rc);
3664
3665                         /* Wakeup the master engine to go ahead. */
3666                         wake_up_all(&mthread->t_ctl_waitq);
3667                 }
3668
3669                 if (llmd->llmd_to_double_scan) {
3670                         llmd->llmd_to_double_scan = 0;
3671                         atomic_inc(&lfsck->li_double_scan_count);
3672                         llmd->llmd_in_double_scan = 1;
3673                         wake_up_all(&mthread->t_ctl_waitq);
3674
3675                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 "
3676                                "scan start\n", lfsck_lfsck2name(lfsck));
3677
3678                         com->lc_new_checked = 0;
3679                         com->lc_new_scanned = 0;
3680                         com->lc_time_last_checkpoint = cfs_time_current();
3681                         com->lc_time_next_checkpoint =
3682                                 com->lc_time_last_checkpoint +
3683                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3684
3685                         /* flush all async updating before handling orphan. */
3686                         dt_sync(env, lfsck->li_next);
3687
3688                         while (llmd->llmd_in_double_scan) {
3689                                 struct lfsck_tgt_descs  *ltds =
3690                                                         &lfsck->li_ost_descs;
3691                                 struct lfsck_tgt_desc   *ltd;
3692
3693                                 rc = lfsck_layout_master_query_others(env, com);
3694                                 if (lfsck_layout_master_to_orphan(llmd))
3695                                         goto orphan;
3696
3697                                 if (rc < 0)
3698                                         GOTO(cleanup2, rc);
3699
3700                                 /* Pull LFSCK status on related targets once
3701                                  * per 30 seconds if we are not notified. */
3702                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
3703                                                            cfs_time_seconds(1),
3704                                                            NULL, NULL);
3705                                 rc = l_wait_event(athread->t_ctl_waitq,
3706                                         lfsck_layout_master_to_orphan(llmd) ||
3707                                         llmd->llmd_exit ||
3708                                         !thread_is_running(mthread),
3709                                         &lwi);
3710
3711                                 if (unlikely(llmd->llmd_exit ||
3712                                              !thread_is_running(mthread)))
3713                                         GOTO(cleanup2, rc = 0);
3714
3715                                 if (rc == -ETIMEDOUT)
3716                                         continue;
3717
3718                                 if (rc < 0)
3719                                         GOTO(cleanup2, rc);
3720
3721 orphan:
3722                                 spin_lock(&ltds->ltd_lock);
3723                                 while (!list_empty(
3724                                                 &llmd->llmd_ost_phase2_list)) {
3725                                         ltd = list_entry(
3726                                               llmd->llmd_ost_phase2_list.next,
3727                                               struct lfsck_tgt_desc,
3728                                               ltd_layout_phase_list);
3729                                         list_del_init(
3730                                                 &ltd->ltd_layout_phase_list);
3731                                         spin_unlock(&ltds->ltd_lock);
3732
3733                                         if (bk->lb_param & LPF_ALL_TGT) {
3734                                                 rc = lfsck_layout_scan_orphan(
3735                                                                 env, com, ltd);
3736                                                 if (rc != 0 &&
3737                                                     bk->lb_param & LPF_FAILOUT)
3738                                                         GOTO(cleanup2, rc);
3739                                         }
3740
3741                                         if (unlikely(llmd->llmd_exit ||
3742                                                 !thread_is_running(mthread)))
3743                                                 GOTO(cleanup2, rc = 0);
3744
3745                                         spin_lock(&ltds->ltd_lock);
3746                                 }
3747
3748                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
3749                                         spin_unlock(&ltds->ltd_lock);
3750                                         GOTO(cleanup2, rc = 1);
3751                                 }
3752                                 spin_unlock(&ltds->ltd_lock);
3753                         }
3754                 }
3755         }
3756
3757 cleanup1:
3758         /* Cleanup the unfinished requests. */
3759         spin_lock(&llmd->llmd_lock);
3760         if (rc < 0)
3761                 llmd->llmd_assistant_status = rc;
3762
3763         while (!list_empty(&llmd->llmd_req_list)) {
3764                 llr = list_entry(llmd->llmd_req_list.next,
3765                                  struct lfsck_layout_req,
3766                                  llr_list);
3767                 list_del_init(&llr->llr_list);
3768                 llmd->llmd_prefetched--;
3769                 spin_unlock(&llmd->llmd_lock);
3770                 lfsck_layout_req_fini(env, llr);
3771                 spin_lock(&llmd->llmd_lock);
3772         }
3773         spin_unlock(&llmd->llmd_lock);
3774
3775         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
3776                  llmd->llmd_prefetched);
3777
3778 cleanup2:
3779         memset(lr, 0, sizeof(*lr));
3780         if (rc > 0) {
3781                 lr->lr_event = LE_PHASE2_DONE;
3782                 lr->lr_status = rc;
3783         } else if (rc == 0) {
3784                 if (lfsck->li_flags & LPF_ALL_TGT) {
3785                         lr->lr_event = LE_STOP;
3786                         lr->lr_status = LS_STOPPED;
3787                 } else {
3788                         lr->lr_event = LE_PEER_EXIT;
3789                         switch (lfsck->li_status) {
3790                         case LS_PAUSED:
3791                         case LS_CO_PAUSED:
3792                                 lr->lr_status = LS_CO_PAUSED;
3793                                 break;
3794                         case LS_STOPPED:
3795                         case LS_CO_STOPPED:
3796                                 lr->lr_status = LS_CO_STOPPED;
3797                                 break;
3798                         default:
3799                                 CDEBUG(D_LFSCK, "%s: unknown status: rc = %d\n",
3800                                        lfsck_lfsck2name(lfsck),
3801                                        lfsck->li_status);
3802                                 lr->lr_status = LS_CO_FAILED;
3803                                 break;
3804                         }
3805                 }
3806         } else {
3807                 if (lfsck->li_flags & LPF_ALL_TGT) {
3808                         lr->lr_event = LE_STOP;
3809                         lr->lr_status = LS_FAILED;
3810                 } else {
3811                         lr->lr_event = LE_PEER_EXIT;
3812                         lr->lr_status = LS_CO_FAILED;
3813                 }
3814         }
3815
3816         rc1 = lfsck_layout_master_notify_others(env, com, lr);
3817         if (rc1 != 0) {
3818                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to "
3819                        "notify others for LFSCK quit: rc = %d\n",
3820                        lfsck_lfsck2name(lfsck), rc1);
3821                 rc = rc1;
3822         }
3823
3824         /* flush all async updating before exit. */
3825         dt_sync(env, lfsck->li_next);
3826
3827         /* Under force exit case, some requests may be just freed without
3828          * verification, those objects should be re-handled when next run.
3829          * So not update the on-disk tracing file under such case. */
3830         if (llmd->llmd_in_double_scan) {
3831                 struct lfsck_layout *lo = com->lc_file_ram;
3832
3833                 if (!llmd->llmd_exit)
3834                         rc1 = lfsck_layout_double_scan_result(env, com, rc);
3835
3836                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 scan "
3837                        "finished, status %d: rc = %d\n",
3838                        lfsck_lfsck2name(lfsck), lo->ll_status, rc1);
3839         }
3840
3841 fini:
3842         if (llmd->llmd_in_double_scan)
3843                 atomic_dec(&lfsck->li_double_scan_count);
3844
3845         spin_lock(&llmd->llmd_lock);
3846         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
3847         thread_set_flags(athread, SVC_STOPPED);
3848         wake_up_all(&mthread->t_ctl_waitq);
3849         spin_unlock(&llmd->llmd_lock);
3850         lfsck_thread_args_fini(lta);
3851
3852         return rc;
3853 }
3854
3855 static int
3856 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3857                                    struct ptlrpc_request *req,
3858                                    void *args, int rc)
3859 {
3860         struct lfsck_layout_slave_async_args *llsaa = args;
3861         struct obd_export                    *exp   = llsaa->llsaa_exp;
3862         struct lfsck_component               *com   = llsaa->llsaa_com;
3863         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3864         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3865         struct lfsck_reply                   *lr    = NULL;
3866         bool                                  done  = false;
3867
3868         if (rc != 0) {
3869                 /* It is quite probably caused by target crash,
3870                  * to make the LFSCK can go ahead, assume that
3871                  * the target finished the LFSCK prcoessing. */
3872                 done = true;
3873         } else {
3874                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3875                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3876                     lr->lr_status != LS_SCANNING_PHASE2)
3877                         done = true;
3878         }
3879
3880         if (done) {
3881                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave gets the MDT %x "
3882                        "status %d\n", lfsck_lfsck2name(com->lc_lfsck),
3883                        llst->llst_index, lr != NULL ? lr->lr_status : rc);
3884
3885                 lfsck_layout_llst_del(llsd, llst);
3886         }
3887
3888         lfsck_layout_llst_put(llst);
3889         lfsck_component_put(env, com);
3890         class_export_put(exp);
3891
3892         return 0;
3893 }
3894
3895 static int lfsck_layout_async_query(const struct lu_env *env,
3896                                     struct lfsck_component *com,
3897                                     struct obd_export *exp,
3898                                     struct lfsck_layout_slave_target *llst,
3899                                     struct lfsck_request *lr,
3900                                     struct ptlrpc_request_set *set)
3901 {
3902         struct lfsck_layout_slave_async_args *llsaa;
3903         struct ptlrpc_request                *req;
3904         struct lfsck_request                 *tmp;
3905         int                                   rc;
3906         ENTRY;
3907
3908         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3909         if (req == NULL)
3910                 RETURN(-ENOMEM);
3911
3912         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3913         if (rc != 0) {
3914                 ptlrpc_request_free(req);
3915                 RETURN(rc);
3916         }
3917
3918         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3919         *tmp = *lr;
3920         ptlrpc_request_set_replen(req);
3921
3922         llsaa = ptlrpc_req_async_args(req);
3923         llsaa->llsaa_exp = exp;
3924         llsaa->llsaa_com = lfsck_component_get(com);
3925         llsaa->llsaa_llst = llst;
3926         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3927         ptlrpc_set_add_req(set, req);
3928
3929         RETURN(0);
3930 }
3931
3932 static int lfsck_layout_async_notify(const struct lu_env *env,
3933                                      struct obd_export *exp,
3934                                      struct lfsck_request *lr,
3935                                      struct ptlrpc_request_set *set)
3936 {
3937         struct ptlrpc_request   *req;
3938         struct lfsck_request    *tmp;
3939         int                      rc;
3940         ENTRY;
3941
3942         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3943         if (req == NULL)
3944                 RETURN(-ENOMEM);
3945
3946         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3947         if (rc != 0) {
3948                 ptlrpc_request_free(req);
3949                 RETURN(rc);
3950         }
3951
3952         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3953         *tmp = *lr;
3954         ptlrpc_request_set_replen(req);
3955         ptlrpc_set_add_req(set, req);
3956
3957         RETURN(0);
3958 }
3959
3960 static int
3961 lfsck_layout_slave_query_master(const struct lu_env *env,
3962                                 struct lfsck_component *com)
3963 {
3964         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3965         struct lfsck_instance            *lfsck = com->lc_lfsck;
3966         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3967         struct lfsck_layout_slave_target *llst;
3968         struct obd_export                *exp;
3969         struct ptlrpc_request_set        *set;
3970         int                               rc    = 0;
3971         int                               rc1   = 0;
3972         ENTRY;
3973
3974         set = ptlrpc_prep_set();
3975         if (set == NULL)
3976                 GOTO(log, rc = -ENOMEM);
3977
3978         memset(lr, 0, sizeof(*lr));
3979         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3980         lr->lr_event = LE_QUERY;
3981         lr->lr_active = LFSCK_TYPE_LAYOUT;
3982
3983         llsd->llsd_touch_gen++;
3984         spin_lock(&llsd->llsd_lock);
3985         while (!list_empty(&llsd->llsd_master_list)) {
3986                 llst = list_entry(llsd->llsd_master_list.next,
3987                                   struct lfsck_layout_slave_target,
3988                                   llst_list);
3989                 if (llst->llst_gen == llsd->llsd_touch_gen)
3990                         break;
3991
3992                 llst->llst_gen = llsd->llsd_touch_gen;
3993                 list_move_tail(&llst->llst_list,
3994                                &llsd->llsd_master_list);
3995                 atomic_inc(&llst->llst_ref);
3996                 spin_unlock(&llsd->llsd_lock);
3997
3998                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3999                                                llst->llst_index);
4000                 if (exp == NULL) {
4001                         lfsck_layout_llst_del(llsd, llst);
4002                         lfsck_layout_llst_put(llst);
4003                         spin_lock(&llsd->llsd_lock);
4004                         continue;
4005                 }
4006
4007                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
4008                 if (rc != 0) {
4009                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
4010                                "query %s for layout: rc = %d\n",
4011                                lfsck_lfsck2name(lfsck),
4012                                exp->exp_obd->obd_name, rc);
4013
4014                         rc1 = rc;
4015                         lfsck_layout_llst_put(llst);
4016                         class_export_put(exp);
4017                 }
4018                 spin_lock(&llsd->llsd_lock);
4019         }
4020         spin_unlock(&llsd->llsd_lock);
4021
4022         rc = ptlrpc_set_wait(set);
4023         ptlrpc_set_destroy(set);
4024
4025         GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
4026
4027 log:
4028         CDEBUG(D_LFSCK, "%s: layout LFSCK slave queries master: rc = %d\n",
4029                lfsck_lfsck2name(com->lc_lfsck), rc);
4030
4031         return rc;
4032 }
4033
4034 static void
4035 lfsck_layout_slave_notify_master(const struct lu_env *env,
4036                                  struct lfsck_component *com,
4037                                  enum lfsck_events event, int result)
4038 {
4039         struct lfsck_instance            *lfsck = com->lc_lfsck;
4040         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4041         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
4042         struct lfsck_layout_slave_target *llst;
4043         struct obd_export                *exp;
4044         struct ptlrpc_request_set        *set;
4045         int                               rc;
4046         ENTRY;
4047
4048         CDEBUG(D_LFSCK, "%s: layout LFSCK slave notifies master\n",
4049                lfsck_lfsck2name(com->lc_lfsck));
4050
4051         set = ptlrpc_prep_set();
4052         if (set == NULL)
4053                 RETURN_EXIT;
4054
4055         memset(lr, 0, sizeof(*lr));
4056         lr->lr_event = event;
4057         lr->lr_flags = LEF_FROM_OST;
4058         lr->lr_status = result;
4059         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
4060         lr->lr_active = LFSCK_TYPE_LAYOUT;
4061         llsd->llsd_touch_gen++;
4062         spin_lock(&llsd->llsd_lock);
4063         while (!list_empty(&llsd->llsd_master_list)) {
4064                 llst = list_entry(llsd->llsd_master_list.next,
4065                                   struct lfsck_layout_slave_target,
4066                                   llst_list);
4067                 if (llst->llst_gen == llsd->llsd_touch_gen)
4068                         break;
4069
4070                 llst->llst_gen = llsd->llsd_touch_gen;
4071                 list_move_tail(&llst->llst_list,
4072                                &llsd->llsd_master_list);
4073                 atomic_inc(&llst->llst_ref);
4074                 spin_unlock(&llsd->llsd_lock);
4075
4076                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
4077                                                llst->llst_index);
4078                 if (exp == NULL) {
4079                         lfsck_layout_llst_del(llsd, llst);
4080                         lfsck_layout_llst_put(llst);
4081                         spin_lock(&llsd->llsd_lock);
4082                         continue;
4083                 }
4084
4085                 rc = lfsck_layout_async_notify(env, exp, lr, set);
4086                 if (rc != 0)
4087                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
4088                                "notify %s for layout: rc = %d\n",
4089                                lfsck_lfsck2name(lfsck),
4090                                exp->exp_obd->obd_name, rc);
4091
4092                 lfsck_layout_llst_put(llst);
4093                 class_export_put(exp);
4094                 spin_lock(&llsd->llsd_lock);
4095         }
4096         spin_unlock(&llsd->llsd_lock);
4097
4098         ptlrpc_set_wait(set);
4099         ptlrpc_set_destroy(set);
4100
4101         RETURN_EXIT;
4102 }
4103
4104 /*
4105  * \ret -ENODATA: unrecognized stripe
4106  * \ret = 0     : recognized stripe
4107  * \ret < 0     : other failures
4108  */
4109 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
4110                                            struct lfsck_component *com,
4111                                            struct lu_fid *cfid,
4112                                            struct lu_fid *pfid)
4113 {
4114         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4115         struct lu_buf                   *buf    = &info->lti_big_buf;
4116         struct ost_id                   *oi     = &info->lti_oi;
4117         struct dt_object                *obj;
4118         struct lov_mds_md_v1            *lmm;
4119         struct lov_ost_data_v1          *objs;
4120         __u32                            idx    = pfid->f_stripe_idx;
4121         __u32                            magic;
4122         int                              rc     = 0;
4123         int                              i;
4124         __u16                            count;
4125         ENTRY;
4126
4127         pfid->f_ver = 0;
4128         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
4129         if (IS_ERR(obj))
4130                 RETURN(PTR_ERR(obj));
4131
4132         dt_read_lock(env, obj, 0);
4133         if (unlikely(dt_object_exists(obj) == 0 ||
4134                      lfsck_is_dead_obj(obj)))
4135                 GOTO(unlock, rc = -ENOENT);
4136
4137         if (!S_ISREG(lfsck_object_type(obj)))
4138                 GOTO(unlock, rc = -ENODATA);
4139
4140         rc = lfsck_layout_get_lovea(env, obj, buf);
4141         if (rc < 0)
4142                 GOTO(unlock, rc);
4143
4144         if (rc == 0)
4145                 GOTO(unlock, rc = -ENODATA);
4146
4147         lmm = buf->lb_buf;
4148         rc = lfsck_layout_verify_header(lmm);
4149         if (rc != 0)
4150                 GOTO(unlock, rc);
4151
4152         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4153          * been verified in lfsck_layout_verify_header() already. If some
4154          * new magic introduced in the future, then layout LFSCK needs to
4155          * be updated also. */
4156         magic = le32_to_cpu(lmm->lmm_magic);
4157         if (magic == LOV_MAGIC_V1) {
4158                 objs = &lmm->lmm_objects[0];
4159         } else {
4160                 LASSERT(magic == LOV_MAGIC_V3);
4161                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4162         }
4163
4164         fid_to_ostid(cfid, oi);
4165         count = le16_to_cpu(lmm->lmm_stripe_count);
4166         for (i = 0; i < count; i++, objs++) {
4167                 struct ost_id oi2;
4168
4169                 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
4170                 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
4171                         GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
4172         }
4173
4174         GOTO(unlock, rc = -ENODATA);
4175
4176 unlock:
4177         dt_read_unlock(env, obj);
4178         lu_object_put(env, &obj->do_lu);
4179
4180         return rc;
4181 }
4182
4183 /*
4184  * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
4185  * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
4186  * layout EA from MDT to OST. On one hand, the OST no need to understand
4187  * the layout EA structure; on the other hand, it may cause trouble when
4188  * transfer large layout EA from MDT to OST via normal OUT RPC.
4189  *
4190  * \ret > 0: unrecognized stripe
4191  * \ret = 0: recognized stripe
4192  * \ret < 0: other failures
4193  */
4194 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
4195                                           struct lfsck_component *com,
4196                                           struct lu_fid *cfid,
4197                                           struct lu_fid *pfid)
4198 {
4199         struct lfsck_instance    *lfsck  = com->lc_lfsck;
4200         struct obd_device        *obd    = lfsck->li_obd;
4201         struct seq_server_site   *ss     =
4202                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
4203         struct obd_export        *exp    = NULL;
4204         struct ptlrpc_request    *req    = NULL;
4205         struct lfsck_request     *lr;
4206         struct lu_seq_range       range  = { 0 };
4207         int                       rc     = 0;
4208         ENTRY;
4209
4210         if (unlikely(fid_is_idif(pfid)))
4211                 RETURN(1);
4212
4213         fld_range_set_any(&range);
4214         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
4215         if (rc != 0)
4216                 RETURN(rc == -ENOENT ? 1 : rc);
4217
4218         if (unlikely(!fld_range_is_mdt(&range)))
4219                 RETURN(1);
4220
4221         exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
4222         if (unlikely(exp == NULL))
4223                 RETURN(1);
4224
4225         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
4226                 GOTO(out, rc = -EOPNOTSUPP);
4227
4228         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
4229         if (req == NULL)
4230                 GOTO(out, rc = -ENOMEM);
4231
4232         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
4233         if (rc != 0) {
4234                 ptlrpc_request_free(req);
4235
4236                 GOTO(out, rc);
4237         }
4238
4239         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
4240         memset(lr, 0, sizeof(*lr));
4241         lr->lr_event = LE_PAIRS_VERIFY;
4242         lr->lr_active = LFSCK_TYPE_LAYOUT;
4243         lr->lr_fid = *cfid; /* OST-object itself FID. */
4244         lr->lr_fid2 = *pfid; /* The claimed parent FID. */
4245
4246         ptlrpc_request_set_replen(req);
4247         rc = ptlrpc_queue_wait(req);
4248         ptlrpc_req_finished(req);
4249
4250         if (rc == -ENOENT || rc == -ENODATA)
4251                 rc = 1;
4252
4253         GOTO(out, rc);
4254
4255 out:
4256         if (exp != NULL)
4257                 class_export_put(exp);
4258
4259         return rc;
4260 }
4261
4262 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
4263                                           struct lfsck_component *com,
4264                                           struct lfsck_request *lr)
4265 {
4266         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4267         struct filter_fid               *ff     = &info->lti_new_pfid;
4268         struct lu_buf                   *buf;
4269         struct dt_device                *dev    = com->lc_lfsck->li_bottom;
4270         struct dt_object                *obj;
4271         struct thandle                  *th     = NULL;
4272         int                              rc     = 0;
4273         ENTRY;
4274
4275         obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
4276         if (IS_ERR(obj))
4277                 GOTO(log, rc = PTR_ERR(obj));
4278
4279         fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
4280         buf = lfsck_buf_get(env, ff, sizeof(*ff));
4281         dt_write_lock(env, obj, 0);
4282         if (unlikely(dt_object_exists(obj) == 0 ||
4283                      lfsck_is_dead_obj(obj)))
4284                 GOTO(unlock, rc = 0);
4285
4286         th = dt_trans_create(env, dev);
4287         if (IS_ERR(th))
4288                 GOTO(unlock, rc = PTR_ERR(th));
4289
4290         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
4291         if (rc != 0)
4292                 GOTO(stop, rc);
4293
4294         rc = dt_trans_start_local(env, dev, th);
4295         if (rc != 0)
4296                 GOTO(stop, rc);
4297
4298         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
4299
4300         GOTO(stop, rc);
4301
4302 stop:
4303         dt_trans_stop(env, dev, th);
4304
4305 unlock:
4306         dt_write_unlock(env, obj);
4307         lu_object_put(env, &obj->do_lu);
4308
4309 log:
4310         CDEBUG(D_LFSCK, "%s: layout LFSCK slave repaired pfid for "DFID
4311                ", parent "DFID": rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
4312                PFID(&lr->lr_fid), PFID(&lr->lr_fid2), rc);
4313
4314         return rc;
4315 }
4316
4317 /* layout APIs */
4318
4319 static int lfsck_layout_reset(const struct lu_env *env,
4320                               struct lfsck_component *com, bool init)
4321 {
4322         struct lfsck_layout     *lo    = com->lc_file_ram;
4323         int                      rc;
4324
4325         down_write(&com->lc_sem);
4326         if (init) {
4327                 memset(lo, 0, com->lc_file_size);
4328         } else {
4329                 __u32 count = lo->ll_success_count;
4330                 __u64 last_time = lo->ll_time_last_complete;
4331
4332                 memset(lo, 0, com->lc_file_size);
4333                 lo->ll_success_count = count;
4334                 lo->ll_time_last_complete = last_time;
4335         }
4336
4337         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
4338         lo->ll_status = LS_INIT;
4339
4340         rc = lfsck_layout_store(env, com);
4341         up_write(&com->lc_sem);
4342
4343         CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
4344                lfsck_lfsck2name(com->lc_lfsck), rc);
4345
4346         return rc;
4347 }
4348
4349 static void lfsck_layout_fail(const struct lu_env *env,
4350                               struct lfsck_component *com, bool new_checked)
4351 {
4352         struct lfsck_layout *lo = com->lc_file_ram;
4353
4354         down_write(&com->lc_sem);
4355         if (new_checked)
4356                 com->lc_new_checked++;
4357         lfsck_layout_record_failure(env, com->lc_lfsck, lo);
4358         up_write(&com->lc_sem);
4359 }
4360
4361 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
4362                                           struct lfsck_component *com, bool init)
4363 {
4364         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4365         struct lfsck_layout             *lo      = com->lc_file_ram;
4366         struct lfsck_layout_master_data *llmd    = com->lc_data;
4367         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4368         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4369         struct l_wait_info               lwi     = { 0 };
4370         int                              rc;
4371
4372         if (com->lc_new_checked == 0 && !init)
4373                 return 0;
4374
4375         l_wait_event(mthread->t_ctl_waitq,
4376                      list_empty(&llmd->llmd_req_list) ||
4377                      !thread_is_running(mthread) ||
4378                      thread_is_stopped(athread),
4379                      &lwi);
4380
4381         if (!thread_is_running(mthread) || thread_is_stopped(athread))
4382                 return 0;
4383
4384         down_write(&com->lc_sem);
4385         if (init) {
4386                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4387         } else {
4388                 lo->ll_pos_last_checkpoint =
4389                                         lfsck->li_pos_current.lp_oit_cookie;
4390                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4391                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4392                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4393                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4394                 com->lc_new_checked = 0;
4395         }
4396
4397         rc = lfsck_layout_store(env, com);
4398         up_write(&com->lc_sem);
4399
4400         CDEBUG(D_LFSCK, "%s: layout LFSCK master checkpoint at the pos ["
4401                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4402                lfsck->li_pos_current.lp_oit_cookie, rc);
4403
4404         return rc;
4405 }
4406
4407 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
4408                                          struct lfsck_component *com, bool init)
4409 {
4410         struct lfsck_instance   *lfsck = com->lc_lfsck;
4411         struct lfsck_layout     *lo    = com->lc_file_ram;
4412         int                      rc;
4413
4414         if (com->lc_new_checked == 0 && !init)
4415                 return 0;
4416
4417         down_write(&com->lc_sem);
4418         if (init) {
4419                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4420         } else {
4421                 lo->ll_pos_last_checkpoint =
4422                                         lfsck->li_pos_current.lp_oit_cookie;
4423                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4424                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4425                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4426                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4427                 com->lc_new_checked = 0;
4428         }
4429
4430         rc = lfsck_layout_store(env, com);
4431         up_write(&com->lc_sem);
4432
4433         CDEBUG(D_LFSCK, "%s: layout LFSCK slave checkpoint at the pos ["
4434                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4435                lfsck->li_pos_current.lp_oit_cookie, rc);
4436
4437         return rc;
4438 }
4439
4440 static int lfsck_layout_prep(const struct lu_env *env,
4441                              struct lfsck_component *com,
4442                              struct lfsck_start *start)
4443 {
4444         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4445         struct lfsck_layout     *lo     = com->lc_file_ram;
4446         struct lfsck_position   *pos    = &com->lc_pos_start;
4447
4448         fid_zero(&pos->lp_dir_parent);
4449         pos->lp_dir_cookie = 0;
4450         if (lo->ll_status == LS_COMPLETED ||
4451             lo->ll_status == LS_PARTIAL ||
4452             /* To handle orphan, must scan from the beginning. */
4453             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
4454                 int rc;
4455
4456                 rc = lfsck_layout_reset(env, com, false);
4457                 if (rc == 0)
4458                         rc = lfsck_set_param(env, lfsck, start, true);
4459
4460                 if (rc != 0) {
4461                         CDEBUG(D_LFSCK, "%s: layout LFSCK prep failed: "
4462                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4463
4464                         return rc;
4465                 }
4466         }
4467
4468         down_write(&com->lc_sem);
4469         lo->ll_time_latest_start = cfs_time_current_sec();
4470         spin_lock(&lfsck->li_lock);
4471         if (lo->ll_flags & LF_SCANNED_ONCE) {
4472                 if (!lfsck->li_drop_dryrun ||
4473                     lo->ll_pos_first_inconsistent == 0) {
4474                         lo->ll_status = LS_SCANNING_PHASE2;
4475                         list_move_tail(&com->lc_link,
4476                                        &lfsck->li_list_double_scan);
4477                         pos->lp_oit_cookie = 0;
4478                 } else {
4479                         int i;
4480
4481                         lo->ll_status = LS_SCANNING_PHASE1;
4482                         lo->ll_run_time_phase1 = 0;
4483                         lo->ll_run_time_phase2 = 0;
4484                         lo->ll_objs_checked_phase1 = 0;
4485                         lo->ll_objs_checked_phase2 = 0;
4486                         lo->ll_objs_failed_phase1 = 0;
4487                         lo->ll_objs_failed_phase2 = 0;
4488                         for (i = 0; i < LLIT_MAX; i++)
4489                                 lo->ll_objs_repaired[i] = 0;
4490
4491                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4492                         fid_zero(&com->lc_fid_latest_scanned_phase2);
4493                 }
4494         } else {
4495                 lo->ll_status = LS_SCANNING_PHASE1;
4496                 if (!lfsck->li_drop_dryrun ||
4497                     lo->ll_pos_first_inconsistent == 0)
4498                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4499                 else
4500                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4501         }
4502         spin_unlock(&lfsck->li_lock);
4503         up_write(&com->lc_sem);
4504
4505         return 0;
4506 }
4507
4508 static int lfsck_layout_slave_prep(const struct lu_env *env,
4509                                    struct lfsck_component *com,
4510                                    struct lfsck_start_param *lsp)
4511 {
4512         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4513         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4514         struct lfsck_layout             *lo     = com->lc_file_ram;
4515         struct lfsck_start              *start  = lsp->lsp_start;
4516         int                              rc;
4517
4518         rc = lfsck_layout_prep(env, com, start);
4519         if (rc != 0)
4520                 return rc;
4521
4522         if (lo->ll_flags & LF_CRASHED_LASTID &&
4523             list_empty(&llsd->llsd_master_list)) {
4524                 LASSERT(lfsck->li_out_notify != NULL);
4525
4526                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4527                                      LE_LASTID_REBUILDING);
4528         }
4529
4530         if (!lsp->lsp_index_valid)
4531                 return 0;
4532
4533         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4534         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
4535                 LASSERT(!llsd->llsd_rbtree_valid);
4536
4537                 write_lock(&llsd->llsd_rb_lock);
4538                 rc = lfsck_rbtree_setup(env, com);
4539                 write_unlock(&llsd->llsd_rb_lock);
4540         }
4541
4542         CDEBUG(D_LFSCK, "%s: layout LFSCK slave prep done, start pos ["
4543                LPU64"]\n", lfsck_lfsck2name(lfsck),
4544                com->lc_pos_start.lp_oit_cookie);
4545
4546         return rc;
4547 }
4548
4549 static int lfsck_layout_master_prep(const struct lu_env *env,
4550                                     struct lfsck_component *com,
4551                                     struct lfsck_start_param *lsp)
4552 {
4553         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4554         struct lfsck_layout_master_data *llmd    = com->lc_data;
4555         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4556         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4557         struct lfsck_thread_args        *lta;
4558         struct task_struct              *task;
4559         int                              rc;
4560         ENTRY;
4561
4562         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4563         if (rc != 0)
4564                 RETURN(rc);
4565
4566         llmd->llmd_assistant_status = 0;
4567         llmd->llmd_post_result = 0;
4568         llmd->llmd_to_post = 0;
4569         llmd->llmd_to_double_scan = 0;
4570         llmd->llmd_in_double_scan = 0;
4571         llmd->llmd_exit = 0;
4572         thread_set_flags(athread, 0);
4573
4574         lta = lfsck_thread_args_init(lfsck, com, lsp);
4575         if (IS_ERR(lta))
4576                 RETURN(PTR_ERR(lta));
4577
4578         task = kthread_run(lfsck_layout_assistant, lta, "lfsck_layout");
4579         if (IS_ERR(task)) {
4580                 rc = PTR_ERR(task);
4581                 CERROR("%s: cannot start LFSCK layout assistant thread: "
4582                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4583                 lfsck_thread_args_fini(lta);
4584         } else {
4585                 struct l_wait_info lwi = { 0 };
4586
4587                 l_wait_event(mthread->t_ctl_waitq,
4588                              thread_is_running(athread) ||
4589                              thread_is_stopped(athread),
4590                              &lwi);
4591                 if (unlikely(!thread_is_running(athread)))
4592                         rc = llmd->llmd_assistant_status;
4593                 else
4594                         rc = 0;
4595         }
4596
4597         CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
4598                LPU64"\n", lfsck_lfsck2name(lfsck),
4599                com->lc_pos_start.lp_oit_cookie);
4600
4601         RETURN(rc);
4602 }
4603
4604 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4605 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4606                                      struct lfsck_component *com,
4607                                      struct dt_object *parent,
4608                                      struct lov_mds_md_v1 *lmm)
4609 {
4610         struct lfsck_thread_info        *info    = lfsck_env_info(env);
4611         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4612         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
4613         struct lfsck_layout             *lo      = com->lc_file_ram;
4614         struct lfsck_layout_master_data *llmd    = com->lc_data;
4615         struct lfsck_layout_object      *llo     = NULL;
4616         struct lov_ost_data_v1          *objs;
4617         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
4618         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4619         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4620                 struct l_wait_info       lwi     = { 0 };
4621         struct lu_buf                    buf;
4622         int                              rc      = 0;
4623         int                              i;
4624         __u32                            magic;
4625         __u16                            count;
4626         __u16                            gen;
4627         ENTRY;
4628
4629         lfsck_buf_init(&buf, &info->lti_old_pfid,
4630                        sizeof(struct filter_fid_old));
4631         count = le16_to_cpu(lmm->lmm_stripe_count);
4632         gen = le16_to_cpu(lmm->lmm_layout_gen);
4633         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4634          * been verified in lfsck_layout_verify_header() already. If some
4635          * new magic introduced in the future, then layout LFSCK needs to
4636          * be updated also. */
4637         magic = le32_to_cpu(lmm->lmm_magic);
4638         if (magic == LOV_MAGIC_V1) {
4639                 objs = &lmm->lmm_objects[0];
4640         } else {
4641                 LASSERT(magic == LOV_MAGIC_V3);
4642                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4643         }
4644
4645         for (i = 0; i < count; i++, objs++) {
4646                 struct lu_fid           *fid    = &info->lti_fid;
4647                 struct ost_id           *oi     = &info->lti_oi;
4648                 struct lfsck_layout_req *llr;
4649                 struct lfsck_tgt_desc   *tgt    = NULL;
4650                 struct dt_object        *cobj   = NULL;
4651                 __u32                    index;
4652                 bool                     wakeup = false;
4653
4654                 if (unlikely(lovea_slot_is_dummy(objs)))
4655                         continue;
4656
4657                 l_wait_event(mthread->t_ctl_waitq,
4658                              bk->lb_async_windows == 0 ||
4659                              llmd->llmd_prefetched < bk->lb_async_windows ||
4660                              !thread_is_running(mthread) ||
4661                              thread_is_stopped(athread),
4662                              &lwi);
4663
4664                 if (unlikely(!thread_is_running(mthread)) ||
4665                              thread_is_stopped(athread))
4666                         GOTO(out, rc = 0);
4667
4668                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4669                 index = le32_to_cpu(objs->l_ost_idx);
4670                 rc = ostid_to_fid(fid, oi, index);
4671                 if (rc != 0) {
4672                         CDEBUG(D_LFSCK, "%s: get invalid layout EA for "DFID
4673                                ": "DOSTID", idx:%u\n", lfsck_lfsck2name(lfsck),
4674                                PFID(lfsck_dto2fid(parent)), POSTID(oi), index);
4675                         goto next;
4676                 }
4677
4678                 tgt = lfsck_tgt_get(ltds, index);
4679                 if (unlikely(tgt == NULL)) {
4680                         CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
4681                                "did not join the layout LFSCK\n",
4682                                lfsck_lfsck2name(lfsck), index);
4683                         lo->ll_flags |= LF_INCOMPLETE;
4684                         goto next;
4685                 }
4686
4687                 cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
4688                 if (IS_ERR(cobj)) {
4689                         rc = PTR_ERR(cobj);
4690                         goto next;
4691                 }
4692
4693                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4694                 if (rc != 0)
4695                         goto next;
4696
4697                 rc = dt_declare_xattr_get(env, cobj, &buf, XATTR_NAME_FID,
4698                                           BYPASS_CAPA);
4699                 if (rc != 0)
4700                         goto next;
4701
4702                 if (llo == NULL) {
4703                         llo = lfsck_layout_object_init(env, parent, gen);
4704                         if (IS_ERR(llo)) {
4705                                 rc = PTR_ERR(llo);
4706                                 goto next;
4707                         }
4708                 }
4709
4710                 llr = lfsck_layout_req_init(llo, cobj, index, i);
4711                 if (IS_ERR(llr)) {
4712                         rc = PTR_ERR(llr);
4713                         goto next;
4714                 }
4715
4716                 cobj = NULL;
4717                 spin_lock(&llmd->llmd_lock);
4718                 if (llmd->llmd_assistant_status < 0) {
4719                         spin_unlock(&llmd->llmd_lock);
4720                         lfsck_layout_req_fini(env, llr);
4721                         lfsck_tgt_put(tgt);
4722                         RETURN(llmd->llmd_assistant_status);
4723                 }
4724
4725                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
4726                 if (llmd->llmd_prefetched == 0)
4727                         wakeup = true;
4728
4729                 llmd->llmd_prefetched++;
4730                 spin_unlock(&llmd->llmd_lock);
4731                 if (wakeup)
4732                         wake_up_all(&athread->t_ctl_waitq);
4733
4734 next:
4735                 down_write(&com->lc_sem);
4736                 com->lc_new_checked++;
4737                 if (rc < 0)
4738                         lfsck_layout_record_failure(env, lfsck, lo);
4739                 up_write(&com->lc_sem);
4740
4741                 if (cobj != NULL && !IS_ERR(cobj))
4742                         lu_object_put(env, &cobj->do_lu);
4743
4744                 if (likely(tgt != NULL))
4745                         lfsck_tgt_put(tgt);
4746
4747                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4748                         GOTO(out, rc);
4749         }
4750
4751         GOTO(out, rc = 0);
4752
4753 out:
4754         if (llo != NULL && !IS_ERR(llo))
4755                 lfsck_layout_object_put(env, llo);
4756
4757         return rc;
4758 }
4759
4760 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4761  * the OST-object's attribute and generate an structure lfsck_layout_req on the
4762  * list ::llmd_req_list.
4763  *
4764  * For each request on above list, the lfsck_layout_assistant thread compares
4765  * the OST side attribute with local attribute, if inconsistent, then repair it.
4766  *
4767  * All above processing is async mode with pipeline. */
4768 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4769                                         struct lfsck_component *com,
4770                                         struct dt_object *obj)
4771 {
4772         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4773         struct ost_id                   *oi     = &info->lti_oi;
4774         struct lfsck_layout             *lo     = com->lc_file_ram;
4775         struct lfsck_layout_master_data *llmd   = com->lc_data;
4776         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4777         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4778         struct thandle                  *handle = NULL;
4779         struct lu_buf                   *buf    = &info->lti_big_buf;
4780         struct lov_mds_md_v1            *lmm    = NULL;
4781         struct dt_device                *dev    = lfsck->li_bottom;
4782         struct lustre_handle             lh     = { 0 };
4783         struct lu_buf                    ea_buf = { 0 };
4784         int                              rc     = 0;
4785         int                              size   = 0;
4786         bool                             locked = false;
4787         bool                             stripe = false;
4788         bool                             bad_oi = false;
4789         ENTRY;
4790
4791         if (!S_ISREG(lfsck_object_type(obj)))
4792                 GOTO(out, rc = 0);
4793
4794         if (llmd->llmd_assistant_status < 0)
4795                 GOTO(out, rc = -ESRCH);
4796
4797         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4798         lmm_oi_cpu_to_le(oi, oi);
4799         dt_read_lock(env, obj, 0);
4800         locked = true;
4801
4802 again:
4803         if (dt_object_exists(obj) == 0 ||
4804             lfsck_is_dead_obj(obj))
4805                 GOTO(out, rc = 0);
4806
4807         rc = lfsck_layout_get_lovea(env, obj, buf);
4808         if (rc <= 0)
4809                 GOTO(out, rc);
4810
4811         size = rc;
4812         lmm = buf->lb_buf;
4813         rc = lfsck_layout_verify_header(lmm);
4814         /* If the LOV EA crashed, then it is possible to be rebuilt later
4815          * when handle orphan OST-objects. */
4816         if (rc != 0)
4817                 GOTO(out, rc);
4818
4819         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4820                 GOTO(out, stripe = true);
4821
4822         /* Inconsistent lmm_oi, should be repaired. */
4823         bad_oi = true;
4824         lmm->lmm_oi = *oi;
4825
4826         if (bk->lb_param & LPF_DRYRUN) {
4827                 down_write(&com->lc_sem);
4828                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4829                 up_write(&com->lc_sem);
4830
4831                 GOTO(out, stripe = true);
4832         }
4833
4834         if (!lustre_handle_is_used(&lh)) {
4835                 dt_read_unlock(env, obj);
4836                 locked = false;
4837                 rc = lfsck_layout_lock(env, com, obj, &lh,
4838                                        MDS_INODELOCK_LAYOUT |
4839                                        MDS_INODELOCK_XATTR);
4840                 if (rc != 0)
4841                         GOTO(out, rc);
4842
4843                 handle = dt_trans_create(env, dev);
4844                 if (IS_ERR(handle))
4845                         GOTO(out, rc = PTR_ERR(handle));
4846
4847                 lfsck_buf_init(&ea_buf, lmm, size);
4848                 rc = dt_declare_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4849                                           LU_XATTR_REPLACE, handle);
4850                 if (rc != 0)
4851                         GOTO(out, rc);
4852
4853                 rc = dt_trans_start_local(env, dev, handle);
4854                 if (rc != 0)
4855                         GOTO(out, rc);
4856
4857                 dt_write_lock(env, obj, 0);
4858                 locked = true;
4859
4860                 goto again;
4861         }
4862
4863         rc = dt_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4864                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4865         if (rc != 0)
4866                 GOTO(out, rc);
4867
4868         down_write(&com->lc_sem);
4869         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4870         up_write(&com->lc_sem);
4871
4872         GOTO(out, stripe = true);
4873
4874 out:
4875         if (locked) {
4876                 if (lustre_handle_is_used(&lh))
4877                         dt_write_unlock(env, obj);
4878                 else
4879                         dt_read_unlock(env, obj);
4880         }
4881
4882         if (handle != NULL && !IS_ERR(handle))
4883                 dt_trans_stop(env, dev, handle);
4884
4885         lfsck_layout_unlock(&lh);
4886
4887         if (bad_oi)
4888                 CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
4889                        DFID": rc = %d\n", lfsck_lfsck2name(lfsck),
4890                        bk->lb_param & LPF_DRYRUN ? "found" : "repaired",
4891                        PFID(lfsck_dto2fid(obj)), rc);
4892
4893         if (stripe) {
4894                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4895         } else {
4896                 down_write(&com->lc_sem);
4897                 com->lc_new_checked++;
4898                 if (rc < 0)
4899                         lfsck_layout_record_failure(env, lfsck, lo);
4900                 up_write(&com->lc_sem);
4901         }
4902
4903         return rc;
4904 }
4905
4906 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4907                                        struct lfsck_component *com,
4908                                        struct dt_object *obj)
4909 {
4910         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4911         struct lfsck_layout             *lo     = com->lc_file_ram;
4912         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
4913         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4914         struct lfsck_layout_seq         *lls;
4915         __u64                            seq;
4916         __u64                            oid;
4917         int                              rc;
4918         ENTRY;
4919
4920         LASSERT(llsd != NULL);
4921
4922         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
4923             cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
4924                 struct l_wait_info       lwi = LWI_TIMEOUT(cfs_time_seconds(1),
4925                                                            NULL, NULL);
4926                 struct ptlrpc_thread    *thread = &lfsck->li_thread;
4927
4928                 l_wait_event(thread->t_ctl_waitq,
4929                              !thread_is_running(thread),
4930                              &lwi);
4931         }
4932
4933         lfsck_rbtree_update_bitmap(env, com, fid, false);
4934
4935         down_write(&com->lc_sem);
4936         if (fid_is_idif(fid))
4937                 seq = 0;
4938         else if (!fid_is_norm(fid) ||
4939                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4940                 GOTO(unlock, rc = 0);
4941         else
4942                 seq = fid_seq(fid);
4943         com->lc_new_checked++;
4944
4945         lls = lfsck_layout_seq_lookup(llsd, seq);
4946         if (lls == NULL) {
4947                 OBD_ALLOC_PTR(lls);
4948                 if (unlikely(lls == NULL))
4949                         GOTO(unlock, rc = -ENOMEM);
4950
4951                 INIT_LIST_HEAD(&lls->lls_list);
4952                 lls->lls_seq = seq;
4953                 rc = lfsck_layout_lastid_load(env, com, lls);
4954                 if (rc != 0) {
4955                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4956                               "load LAST_ID for "LPX64": rc = %d\n",
4957                               lfsck_lfsck2name(com->lc_lfsck), seq, rc);
4958                         lo->ll_objs_failed_phase1++;
4959                         OBD_FREE_PTR(lls);
4960                         GOTO(unlock, rc);
4961                 }
4962
4963                 lfsck_layout_seq_insert(llsd, lls);
4964         }
4965
4966         if (unlikely(fid_is_last_id(fid)))
4967                 GOTO(unlock, rc = 0);
4968
4969         if (fid_is_idif(fid))
4970                 oid = fid_idif_id(fid_seq(fid), fid_oid(fid), fid_ver(fid));
4971         else
4972                 oid = fid_oid(fid);
4973
4974         if (oid > lls->lls_lastid_known)
4975                 lls->lls_lastid_known = oid;
4976
4977         if (oid > lls->lls_lastid) {
4978                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4979                         /* OFD may create new objects during LFSCK scanning. */
4980                         rc = lfsck_layout_lastid_reload(env, com, lls);
4981                         if (unlikely(rc != 0)) {
4982                                 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4983                                       "reload LAST_ID for "LPX64": rc = %d\n",
4984                                       lfsck_lfsck2name(com->lc_lfsck),
4985                                       lls->lls_seq, rc);
4986
4987                                 GOTO(unlock, rc);
4988                         }
4989
4990                         if (oid <= lls->lls_lastid ||
4991                             lo->ll_flags & LF_CRASHED_LASTID)
4992                                 GOTO(unlock, rc = 0);
4993
4994                         LASSERT(lfsck->li_out_notify != NULL);
4995
4996                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4997                                              LE_LASTID_REBUILDING);
4998                         lo->ll_flags |= LF_CRASHED_LASTID;
4999
5000                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
5001                                "LAST_ID file (2) for the sequence "LPX64
5002                                ", old value "LPU64", known value "LPU64"\n",
5003                                lfsck_lfsck2name(lfsck), lls->lls_seq,
5004                                lls->lls_lastid, oid);
5005                 }
5006
5007                 lls->lls_lastid = oid;
5008                 lls->lls_dirty = 1;
5009         }
5010
5011         GOTO(unlock, rc = 0);
5012
5013 unlock:
5014         up_write(&com->lc_sem);
5015
5016         return rc;
5017 }
5018
5019 static int lfsck_layout_exec_dir(const struct lu_env *env,
5020                                  struct lfsck_component *com,
5021                                  struct dt_object *obj,
5022                                  struct lu_dirent *ent)
5023 {
5024         return 0;
5025 }
5026
5027 static int lfsck_layout_master_post(const struct lu_env *env,
5028                                     struct lfsck_component *com,
5029                                     int result, bool init)
5030 {
5031         struct lfsck_instance           *lfsck   = com->lc_lfsck;
5032         struct lfsck_layout             *lo      = com->lc_file_ram;
5033         struct lfsck_layout_master_data *llmd    = com->lc_data;
5034         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
5035         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5036         struct l_wait_info               lwi     = { 0 };
5037         int                              rc;
5038         ENTRY;
5039
5040
5041         llmd->llmd_post_result = result;
5042         llmd->llmd_to_post = 1;
5043         if (llmd->llmd_post_result <= 0)
5044                 llmd->llmd_exit = 1;
5045
5046         wake_up_all(&athread->t_ctl_waitq);
5047         l_wait_event(mthread->t_ctl_waitq,
5048                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
5049                      thread_is_stopped(athread),
5050                      &lwi);
5051
5052         if (llmd->llmd_assistant_status < 0)
5053                 result = llmd->llmd_assistant_status;
5054
5055         down_write(&com->lc_sem);
5056         spin_lock(&lfsck->li_lock);
5057         /* When LFSCK failed, there may be some prefetched objects those are
5058          * not been processed yet, we do not know the exactly position, then
5059          * just restart from last check-point next time. */
5060         if (!init && !llmd->llmd_exit)
5061                 lo->ll_pos_last_checkpoint =
5062                                         lfsck->li_pos_current.lp_oit_cookie;
5063
5064         if (result > 0) {
5065                 lo->ll_status = LS_SCANNING_PHASE2;
5066                 lo->ll_flags |= LF_SCANNED_ONCE;
5067                 lo->ll_flags &= ~LF_UPGRADE;
5068                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
5069         } else if (result == 0) {
5070                 lo->ll_status = lfsck->li_status;
5071                 if (lo->ll_status == 0)
5072                         lo->ll_status = LS_STOPPED;
5073                 if (lo->ll_status != LS_PAUSED) {
5074                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5075                 }
5076         } else {
5077                 lo->ll_status = LS_FAILED;
5078                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5079         }
5080         spin_unlock(&lfsck->li_lock);
5081
5082         if (!init) {
5083                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5084                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5085                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5086                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5087                 com->lc_new_checked = 0;
5088         }
5089
5090         rc = lfsck_layout_store(env, com);
5091         up_write(&com->lc_sem);
5092
5093         CDEBUG(D_LFSCK, "%s: layout LFSCK master post done: rc = %d\n",
5094                lfsck_lfsck2name(lfsck), rc);
5095
5096         RETURN(rc);
5097 }
5098
5099 static int lfsck_layout_slave_post(const struct lu_env *env,
5100                                    struct lfsck_component *com,
5101                                    int result, bool init)
5102 {
5103         struct lfsck_instance   *lfsck = com->lc_lfsck;
5104         struct lfsck_layout     *lo    = com->lc_file_ram;
5105         int                      rc;
5106         bool                     done  = false;
5107
5108         rc = lfsck_layout_lastid_store(env, com);
5109         if (rc != 0)
5110                 result = rc;
5111
5112         LASSERT(lfsck->li_out_notify != NULL);
5113
5114         down_write(&com->lc_sem);
5115         spin_lock(&lfsck->li_lock);
5116         if (!init)
5117                 lo->ll_pos_last_checkpoint =
5118                                         lfsck->li_pos_current.lp_oit_cookie;
5119         if (result > 0) {
5120                 lo->ll_status = LS_SCANNING_PHASE2;
5121                 lo->ll_flags |= LF_SCANNED_ONCE;
5122                 if (lo->ll_flags & LF_CRASHED_LASTID) {
5123                         done = true;
5124                         lo->ll_flags &= ~LF_CRASHED_LASTID;
5125
5126                         CDEBUG(D_LFSCK, "%s: layout LFSCK has rebuilt "
5127                                "crashed LAST_ID files successfully\n",
5128                                lfsck_lfsck2name(lfsck));
5129                 }
5130                 lo->ll_flags &= ~LF_UPGRADE;
5131                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
5132         } else if (result == 0) {
5133                 lo->ll_status = lfsck->li_status;
5134                 if (lo->ll_status == 0)
5135                         lo->ll_status = LS_STOPPED;
5136                 if (lo->ll_status != LS_PAUSED)
5137                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5138         } else {
5139                 lo->ll_status = LS_FAILED;
5140                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5141         }
5142         spin_unlock(&lfsck->li_lock);
5143
5144         if (done)
5145                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5146                                      LE_LASTID_REBUILT);
5147
5148         if (!init) {
5149                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5150                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5151                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5152                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5153                 com->lc_new_checked = 0;
5154         }
5155
5156         rc = lfsck_layout_store(env, com);
5157         up_write(&com->lc_sem);
5158
5159         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
5160
5161         if (result <= 0)
5162                 lfsck_rbtree_cleanup(env, com);
5163
5164         CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
5165                lfsck_lfsck2name(lfsck), rc);
5166
5167         return rc;
5168 }
5169
5170 static int lfsck_layout_dump(const struct lu_env *env,
5171                              struct lfsck_component *com, struct seq_file *m)
5172 {
5173         struct lfsck_instance   *lfsck = com->lc_lfsck;
5174         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
5175         struct lfsck_layout     *lo    = com->lc_file_ram;
5176         int                      rc;
5177
5178         down_read(&com->lc_sem);
5179         seq_printf(m, "name: lfsck_layout\n"
5180                       "magic: %#x\n"
5181                       "version: %d\n"
5182                       "status: %s\n",
5183                       lo->ll_magic,
5184                       bk->lb_version,
5185                       lfsck_status2names(lo->ll_status));
5186
5187         rc = lfsck_bits_dump(m, lo->ll_flags, lfsck_flags_names, "flags");
5188         if (rc < 0)
5189                 goto out;
5190
5191         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
5192         if (rc < 0)
5193                 goto out;
5194
5195         rc = lfsck_time_dump(m, lo->ll_time_last_complete,
5196                              "time_since_last_completed");
5197         if (rc < 0)
5198                 goto out;
5199
5200         rc = lfsck_time_dump(m, lo->ll_time_latest_start,
5201                              "time_since_latest_start");
5202         if (rc < 0)
5203                 goto out;
5204
5205         rc = lfsck_time_dump(m, lo->ll_time_last_checkpoint,
5206                              "time_since_last_checkpoint");
5207         if (rc < 0)
5208                 goto out;
5209
5210         seq_printf(m, "latest_start_position: "LPU64"\n"
5211                       "last_checkpoint_position: "LPU64"\n"
5212                       "first_failure_position: "LPU64"\n",
5213                       lo->ll_pos_latest_start,
5214                       lo->ll_pos_last_checkpoint,
5215                       lo->ll_pos_first_inconsistent);
5216
5217         seq_printf(m, "success_count: %u\n"
5218                       "repaired_dangling: "LPU64"\n"
5219                       "repaired_unmatched_pair: "LPU64"\n"
5220                       "repaired_multiple_referenced: "LPU64"\n"
5221                       "repaired_orphan: "LPU64"\n"
5222                       "repaired_inconsistent_owner: "LPU64"\n"
5223                       "repaired_others: "LPU64"\n"
5224                       "skipped: "LPU64"\n"
5225                       "failed_phase1: "LPU64"\n"
5226                       "failed_phase2: "LPU64"\n",
5227                       lo->ll_success_count,
5228                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
5229                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
5230                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
5231                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
5232                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
5233                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
5234                       lo->ll_objs_skipped,
5235                       lo->ll_objs_failed_phase1,
5236                       lo->ll_objs_failed_phase2);
5237
5238         if (lo->ll_status == LS_SCANNING_PHASE1) {
5239                 __u64 pos;
5240                 const struct dt_it_ops *iops;
5241                 cfs_duration_t duration = cfs_time_current() -
5242                                           lfsck->li_time_last_checkpoint;
5243                 __u64 checked = lo->ll_objs_checked_phase1 +
5244                                 com->lc_new_checked;
5245                 __u64 speed = checked;
5246                 __u64 new_checked = com->lc_new_checked * HZ;
5247                 __u32 rtime = lo->ll_run_time_phase1 +
5248                               cfs_duration_sec(duration + HALF_SEC);
5249
5250                 if (duration != 0)
5251                         do_div(new_checked, duration);
5252                 if (rtime != 0)
5253                         do_div(speed, rtime);
5254                 seq_printf(m, "checked_phase1: "LPU64"\n"
5255                               "checked_phase2: "LPU64"\n"
5256                               "run_time_phase1: %u seconds\n"
5257                               "run_time_phase2: %u seconds\n"
5258                               "average_speed_phase1: "LPU64" items/sec\n"
5259                               "average_speed_phase2: N/A\n"
5260                               "real-time_speed_phase1: "LPU64" items/sec\n"
5261                               "real-time_speed_phase2: N/A\n",
5262                               checked,
5263                               lo->ll_objs_checked_phase2,
5264                               rtime,
5265                               lo->ll_run_time_phase2,
5266                               speed,
5267                               new_checked);
5268
5269                 LASSERT(lfsck->li_di_oit != NULL);
5270
5271                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
5272
5273                 /* The low layer otable-based iteration position may NOT
5274                  * exactly match the layout-based directory traversal
5275                  * cookie. Generally, it is not a serious issue. But the
5276                  * caller should NOT make assumption on that. */
5277                 pos = iops->store(env, lfsck->li_di_oit);
5278                 if (!lfsck->li_current_oit_processed)
5279                         pos--;
5280                 seq_printf(m, "current_position: "LPU64"\n", pos);
5281
5282         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
5283                 cfs_duration_t duration = cfs_time_current() -
5284                                           lfsck->li_time_last_checkpoint;
5285                 __u64 checked = lo->ll_objs_checked_phase2 +
5286                                 com->lc_new_checked;
5287                 __u64 speed1 = lo->ll_objs_checked_phase1;
5288                 __u64 speed2 = checked;
5289                 __u64 new_checked = com->lc_new_checked * HZ;
5290                 __u32 rtime = lo->ll_run_time_phase2 +
5291                               cfs_duration_sec(duration + HALF_SEC);
5292
5293                 if (duration != 0)
5294                         do_div(new_checked, duration);
5295                 if (lo->ll_run_time_phase1 != 0)
5296                         do_div(speed1, lo->ll_run_time_phase1);
5297                 if (rtime != 0)
5298                         do_div(speed2, rtime);
5299                 rc = seq_printf(m, "checked_phase1: "LPU64"\n"
5300                                 "checked_phase2: "LPU64"\n"
5301                                 "run_time_phase1: %u seconds\n"
5302                                 "run_time_phase2: %u seconds\n"
5303                                 "average_speed_phase1: "LPU64" items/sec\n"
5304                                 "average_speed_phase2: "LPU64" items/sec\n"
5305                                 "real-time_speed_phase1: N/A\n"
5306                                 "real-time_speed_phase2: "LPU64" items/sec\n"
5307                                 "current_position: "DFID"\n",
5308                                 lo->ll_objs_checked_phase1,
5309                                 checked,
5310                                 lo->ll_run_time_phase1,
5311                                 rtime,
5312                                 speed1,
5313                                 speed2,
5314                                 new_checked,
5315                                 PFID(&com->lc_fid_latest_scanned_phase2));
5316                 if (rc <= 0)
5317                         goto out;
5318
5319         } else {
5320                 __u64 speed1 = lo->ll_objs_checked_phase1;
5321                 __u64 speed2 = lo->ll_objs_checked_phase2;
5322
5323                 if (lo->ll_run_time_phase1 != 0)
5324                         do_div(speed1, lo->ll_run_time_phase1);
5325                 if (lo->ll_run_time_phase2 != 0)
5326                         do_div(speed2, lo->ll_run_time_phase2);
5327                 seq_printf(m, "checked_phase1: "LPU64"\n"
5328                            "checked_phase2: "LPU64"\n"
5329                            "run_time_phase1: %u seconds\n"
5330                            "run_time_phase2: %u seconds\n"
5331                            "average_speed_phase1: "LPU64" items/sec\n"
5332                            "average_speed_phase2: "LPU64" objs/sec\n"
5333                            "real-time_speed_phase1: N/A\n"
5334                            "real-time_speed_phase2: N/A\n"
5335                            "current_position: N/A\n",
5336                            lo->ll_objs_checked_phase1,
5337                            lo->ll_objs_checked_phase2,
5338                            lo->ll_run_time_phase1,
5339                            lo->ll_run_time_phase2,
5340                            speed1,
5341                            speed2);
5342         }
5343 out:
5344         up_read(&com->lc_sem);
5345
5346         return rc;
5347 }
5348
5349 static int lfsck_layout_master_double_scan(const struct lu_env *env,
5350                                            struct lfsck_component *com)
5351 {
5352         struct lfsck_layout_master_data *llmd    = com->lc_data;
5353         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5354         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5355         struct lfsck_layout             *lo      = com->lc_file_ram;
5356         struct l_wait_info               lwi     = { 0 };
5357
5358         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
5359                 return 0;
5360
5361         llmd->llmd_to_double_scan = 1;
5362         wake_up_all(&athread->t_ctl_waitq);
5363         l_wait_event(mthread->t_ctl_waitq,
5364                      llmd->llmd_in_double_scan ||
5365                      thread_is_stopped(athread),
5366                      &lwi);
5367         if (llmd->llmd_assistant_status < 0)
5368                 return llmd->llmd_assistant_status;
5369
5370         return 0;
5371 }
5372
5373 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
5374                                           struct lfsck_component *com)
5375 {
5376         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5377         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5378         struct lfsck_layout             *lo     = com->lc_file_ram;
5379         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5380         int                              rc;
5381         ENTRY;
5382
5383         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
5384                 lfsck_rbtree_cleanup(env, com);
5385                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
5386                 RETURN(0);
5387         }
5388
5389         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
5390                lfsck_lfsck2name(lfsck));
5391
5392         atomic_inc(&lfsck->li_double_scan_count);
5393
5394         com->lc_new_checked = 0;
5395         com->lc_new_scanned = 0;
5396         com->lc_time_last_checkpoint = cfs_time_current();
5397         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
5398                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5399
5400         while (1) {
5401                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
5402                                                      NULL, NULL);
5403
5404                 rc = lfsck_layout_slave_query_master(env, com);
5405                 if (list_empty(&llsd->llsd_master_list)) {
5406                         if (unlikely(!thread_is_running(thread)))
5407                                 rc = 0;
5408                         else
5409                                 rc = 1;
5410
5411                         GOTO(done, rc);
5412                 }
5413
5414                 if (rc < 0)
5415                         GOTO(done, rc);
5416
5417                 rc = l_wait_event(thread->t_ctl_waitq,
5418                                   !thread_is_running(thread) ||
5419                                   list_empty(&llsd->llsd_master_list),
5420                                   &lwi);
5421                 if (unlikely(!thread_is_running(thread)))
5422                         GOTO(done, rc = 0);
5423
5424                 if (rc == -ETIMEDOUT)
5425                         continue;
5426
5427                 GOTO(done, rc = (rc < 0 ? rc : 1));
5428         }
5429
5430 done:
5431         rc = lfsck_layout_double_scan_result(env, com, rc);
5432
5433         lfsck_rbtree_cleanup(env, com);
5434         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
5435         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5436                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5437
5438         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan finished, "
5439                "status %d: rc = %d\n",
5440                lfsck_lfsck2name(lfsck), lo->ll_status, rc);
5441
5442         return rc;
5443 }
5444
5445 static void lfsck_layout_master_data_release(const struct lu_env *env,
5446                                              struct lfsck_component *com)
5447 {
5448         struct lfsck_layout_master_data *llmd   = com->lc_data;
5449         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5450         struct lfsck_tgt_descs          *ltds;
5451         struct lfsck_tgt_desc           *ltd;
5452         struct lfsck_tgt_desc           *next;
5453
5454         LASSERT(llmd != NULL);
5455         LASSERT(thread_is_init(&llmd->llmd_thread) ||
5456                 thread_is_stopped(&llmd->llmd_thread));
5457         LASSERT(list_empty(&llmd->llmd_req_list));
5458
5459         com->lc_data = NULL;
5460
5461         ltds = &lfsck->li_ost_descs;
5462         spin_lock(&ltds->ltd_lock);
5463         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
5464                                  ltd_layout_phase_list) {
5465                 list_del_init(&ltd->ltd_layout_phase_list);
5466         }
5467         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
5468                                  ltd_layout_phase_list) {
5469                 list_del_init(&ltd->ltd_layout_phase_list);
5470         }
5471         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
5472                                  ltd_layout_list) {
5473                 list_del_init(&ltd->ltd_layout_list);
5474         }
5475         spin_unlock(&ltds->ltd_lock);
5476
5477         ltds = &lfsck->li_mdt_descs;
5478         spin_lock(&ltds->ltd_lock);
5479         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
5480                                  ltd_layout_phase_list) {
5481                 list_del_init(&ltd->ltd_layout_phase_list);
5482         }
5483         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
5484                                  ltd_layout_phase_list) {
5485                 list_del_init(&ltd->ltd_layout_phase_list);
5486         }
5487         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
5488                                  ltd_layout_list) {
5489                 list_del_init(&ltd->ltd_layout_list);
5490         }
5491         spin_unlock(&ltds->ltd_lock);
5492
5493         OBD_FREE_PTR(llmd);
5494 }
5495
5496 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5497                                             struct lfsck_component *com)
5498 {
5499         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5500         struct lfsck_layout_seq          *lls;
5501         struct lfsck_layout_seq          *next;
5502         struct lfsck_layout_slave_target *llst;
5503         struct lfsck_layout_slave_target *tmp;
5504
5505         LASSERT(llsd != NULL);
5506
5507         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5508                                      lls_list) {
5509                 list_del_init(&lls->lls_list);
5510                 lfsck_object_put(env, lls->lls_lastid_obj);
5511                 OBD_FREE_PTR(lls);
5512         }
5513
5514         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
5515                                  llst_list) {
5516                 list_del_init(&llst->llst_list);
5517                 OBD_FREE_PTR(llst);
5518         }
5519
5520         lfsck_rbtree_cleanup(env, com);
5521         com->lc_data = NULL;
5522         OBD_FREE_PTR(llsd);
5523 }
5524
5525 static void lfsck_layout_master_quit(const struct lu_env *env,
5526                                      struct lfsck_component *com)
5527 {
5528         struct lfsck_layout_master_data *llmd    = com->lc_data;
5529         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5530         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5531         struct l_wait_info               lwi     = { 0 };
5532
5533         llmd->llmd_exit = 1;
5534         wake_up_all(&athread->t_ctl_waitq);
5535         l_wait_event(mthread->t_ctl_waitq,
5536                      thread_is_init(athread) ||
5537                      thread_is_stopped(athread),
5538                      &lwi);
5539 }
5540
5541 static void lfsck_layout_slave_quit(const struct lu_env *env,
5542                                     struct lfsck_component *com)
5543 {
5544         lfsck_rbtree_cleanup(env, com);
5545 }
5546
5547 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5548                                          struct lfsck_component *com,
5549                                          struct lfsck_request *lr)
5550 {
5551         struct lfsck_instance           *lfsck = com->lc_lfsck;
5552         struct lfsck_layout             *lo    = com->lc_file_ram;
5553         struct lfsck_layout_master_data *llmd  = com->lc_data;
5554         struct lfsck_tgt_descs          *ltds;
5555         struct lfsck_tgt_desc           *ltd;
5556         bool                             fail  = false;
5557         ENTRY;
5558
5559         if (lr->lr_event == LE_PAIRS_VERIFY) {
5560                 int rc;
5561
5562                 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5563                                                      &lr->lr_fid2);
5564
5565                 RETURN(rc);
5566         }
5567
5568         CDEBUG(D_LFSCK, "%s: layout LFSCK master handle notify %u "
5569                "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
5570                lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5571                lr->lr_index, lr->lr_status);
5572
5573         if (lr->lr_event != LE_PHASE1_DONE &&
5574             lr->lr_event != LE_PHASE2_DONE &&
5575             lr->lr_event != LE_PEER_EXIT)
5576                 RETURN(-EINVAL);
5577
5578         if (lr->lr_flags & LEF_FROM_OST)
5579                 ltds = &lfsck->li_ost_descs;
5580         else
5581                 ltds = &lfsck->li_mdt_descs;
5582         spin_lock(&ltds->ltd_lock);
5583         ltd = LTD_TGT(ltds, lr->lr_index);
5584         if (ltd == NULL) {
5585                 spin_unlock(&ltds->ltd_lock);
5586
5587                 RETURN(-ENXIO);
5588         }
5589
5590         list_del_init(&ltd->ltd_layout_phase_list);
5591         switch (lr->lr_event) {
5592         case LE_PHASE1_DONE:
5593                 if (lr->lr_status <= 0) {
5594                         ltd->ltd_layout_done = 1;
5595                         list_del_init(&ltd->ltd_layout_list);
5596                         lo->ll_flags |= LF_INCOMPLETE;
5597                         fail = true;
5598                         break;
5599                 }
5600
5601                 if (lr->lr_flags & LEF_FROM_OST) {
5602                         if (list_empty(&ltd->ltd_layout_list))
5603                                 list_add_tail(&ltd->ltd_layout_list,
5604                                               &llmd->llmd_ost_list);
5605                         list_add_tail(&ltd->ltd_layout_phase_list,
5606                                       &llmd->llmd_ost_phase2_list);
5607                 } else {
5608                         if (list_empty(&ltd->ltd_layout_list))
5609                                 list_add_tail(&ltd->ltd_layout_list,
5610                                               &llmd->llmd_mdt_list);
5611                         list_add_tail(&ltd->ltd_layout_phase_list,
5612                                       &llmd->llmd_mdt_phase2_list);
5613                 }
5614                 break;
5615         case LE_PHASE2_DONE:
5616                 ltd->ltd_layout_done = 1;
5617                 list_del_init(&ltd->ltd_layout_list);
5618                 break;
5619         case LE_PEER_EXIT:
5620                 fail = true;
5621                 ltd->ltd_layout_done = 1;
5622                 list_del_init(&ltd->ltd_layout_list);
5623                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
5624                         lo->ll_flags |= LF_INCOMPLETE;
5625                 break;
5626         default:
5627                 break;
5628         }
5629         spin_unlock(&ltds->ltd_lock);
5630
5631         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5632                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5633
5634                 memset(stop, 0, sizeof(*stop));
5635                 stop->ls_status = lr->lr_status;
5636                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5637                 lfsck_stop(env, lfsck->li_bottom, stop);
5638         } else if (lfsck_layout_master_to_orphan(llmd)) {
5639                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
5640         }
5641
5642         RETURN(0);
5643 }
5644
5645 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5646                                         struct lfsck_component *com,
5647                                         struct lfsck_request *lr)
5648 {
5649         struct lfsck_instance            *lfsck = com->lc_lfsck;
5650         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5651         struct lfsck_layout_slave_target *llst;
5652         int                               rc;
5653         ENTRY;
5654
5655         switch (lr->lr_event) {
5656         case LE_FID_ACCESSED:
5657                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5658                 RETURN(0);
5659         case LE_CONDITIONAL_DESTROY:
5660                 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5661                 RETURN(rc);
5662         case LE_PAIRS_VERIFY: {
5663                 lr->lr_status = LPVS_INIT;
5664                 /* Firstly, if the MDT-object which is claimed via OST-object
5665                  * local stored PFID xattr recognizes the OST-object, then it
5666                  * must be that the client given PFID is wrong. */
5667                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5668                                                     &lr->lr_fid3);
5669                 if (rc <= 0)
5670                         RETURN(0);
5671
5672                 lr->lr_status = LPVS_INCONSISTENT;
5673                 /* The OST-object local stored PFID xattr is stale. We need to
5674                  * check whether the MDT-object that is claimed via the client
5675                  * given PFID information recognizes the OST-object or not. If
5676                  * matches, then need to update the OST-object's PFID xattr. */
5677                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5678                                                     &lr->lr_fid2);
5679                 /* For rc < 0 case:
5680                  * We are not sure whether the client given PFID information
5681                  * is correct or not, do nothing to avoid improper fixing.
5682                  *
5683                  * For rc > 0 case:
5684                  * The client given PFID information is also invalid, we can
5685                  * NOT fix the OST-object inconsistency.
5686                  */
5687                 if (rc != 0)
5688                         RETURN(rc);
5689
5690                 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5691                 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5692
5693                 RETURN(rc);
5694         }
5695         case LE_PHASE2_DONE:
5696         case LE_PEER_EXIT:
5697                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave handle notify %u "
5698                        "from MDT %x, status %d\n", lfsck_lfsck2name(lfsck),
5699                        lr->lr_event, lr->lr_index, lr->lr_status);
5700                 break;
5701         default:
5702                 RETURN(-EINVAL);
5703         }
5704
5705         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5706         if (llst == NULL)
5707                 RETURN(-ENXIO);
5708
5709         lfsck_layout_llst_put(llst);
5710         if (list_empty(&llsd->llsd_master_list))
5711                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5712
5713         if (lr->lr_event == LE_PEER_EXIT &&
5714             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5715                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5716
5717                 memset(stop, 0, sizeof(*stop));
5718                 stop->ls_status = lr->lr_status;
5719                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5720                 lfsck_stop(env, lfsck->li_bottom, stop);
5721         }
5722
5723         RETURN(0);
5724 }
5725
5726 static int lfsck_layout_query(const struct lu_env *env,
5727                               struct lfsck_component *com)
5728 {
5729         struct lfsck_layout *lo = com->lc_file_ram;
5730
5731         return lo->ll_status;
5732 }
5733
5734 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
5735                                            struct lfsck_component *com,
5736                                            struct lfsck_tgt_descs *ltds,
5737                                            struct lfsck_tgt_desc *ltd,
5738                                            struct ptlrpc_request_set *set)
5739 {
5740         struct lfsck_thread_info          *info  = lfsck_env_info(env);
5741         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
5742         struct lfsck_request              *lr    = &info->lti_lr;
5743         struct lfsck_instance             *lfsck = com->lc_lfsck;
5744         int                                rc;
5745
5746         spin_lock(&ltds->ltd_lock);
5747         if (list_empty(&ltd->ltd_layout_list)) {
5748                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
5749                 spin_unlock(&ltds->ltd_lock);
5750
5751                 return 0;
5752         }
5753
5754         list_del_init(&ltd->ltd_layout_phase_list);
5755         list_del_init(&ltd->ltd_layout_list);
5756         spin_unlock(&ltds->ltd_lock);
5757
5758         memset(lr, 0, sizeof(*lr));
5759         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
5760         lr->lr_event = LE_PEER_EXIT;
5761         lr->lr_active = LFSCK_TYPE_LAYOUT;
5762         lr->lr_status = LS_CO_PAUSED;
5763         if (ltds == &lfsck->li_ost_descs)
5764                 lr->lr_flags = LEF_TO_OST;
5765
5766         laia->laia_com = com;
5767         laia->laia_ltds = ltds;
5768         atomic_inc(&ltd->ltd_ref);
5769         laia->laia_ltd = ltd;
5770         laia->laia_lr = lr;
5771         laia->laia_shared = 0;
5772
5773         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
5774                                  lfsck_layout_master_async_interpret,
5775                                  laia, LFSCK_NOTIFY);
5776         if (rc != 0) {
5777                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to notify %s %x "
5778                        "for co-stop: rc = %d\n",
5779                        lfsck_lfsck2name(lfsck),
5780                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5781                        ltd->ltd_index, rc);
5782                 lfsck_tgt_put(ltd);
5783         }
5784
5785         return rc;
5786 }
5787
5788 /* with lfsck::li_lock held */
5789 static int lfsck_layout_slave_join(const struct lu_env *env,
5790                                    struct lfsck_component *com,
5791                                    struct lfsck_start_param *lsp)
5792 {
5793         struct lfsck_instance            *lfsck = com->lc_lfsck;
5794         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5795         struct lfsck_layout_slave_target *llst;
5796         struct lfsck_start               *start = lsp->lsp_start;
5797         int                               rc    = 0;
5798         ENTRY;
5799
5800         if (start == NULL || !(start->ls_flags & LPF_ORPHAN))
5801                 RETURN(0);
5802
5803         if (!lsp->lsp_index_valid)
5804                 RETURN(-EINVAL);
5805
5806         /* If someone is running the LFSCK without orphan handling,
5807          * it will not maintain the object accessing rbtree. So we
5808          * cannot join it for orphan handling. */
5809         if (!llsd->llsd_rbtree_valid)
5810                 RETURN(-EBUSY);
5811
5812         spin_unlock(&lfsck->li_lock);
5813         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5814         spin_lock(&lfsck->li_lock);
5815         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5816                 spin_unlock(&lfsck->li_lock);
5817                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5818                                                       true);
5819                 if (llst != NULL)
5820                         lfsck_layout_llst_put(llst);
5821                 spin_lock(&lfsck->li_lock);
5822                 rc = -EAGAIN;
5823         }
5824
5825         RETURN(rc);
5826 }
5827
5828 static struct lfsck_operations lfsck_layout_master_ops = {
5829         .lfsck_reset            = lfsck_layout_reset,
5830         .lfsck_fail             = lfsck_layout_fail,
5831         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
5832         .lfsck_prep             = lfsck_layout_master_prep,
5833         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
5834         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5835         .lfsck_post             = lfsck_layout_master_post,
5836         .lfsck_interpret        = lfsck_layout_master_async_interpret,
5837         .lfsck_dump             = lfsck_layout_dump,
5838         .lfsck_double_scan      = lfsck_layout_master_double_scan,
5839         .lfsck_data_release     = lfsck_layout_master_data_release,
5840         .lfsck_quit             = lfsck_layout_master_quit,
5841         .lfsck_in_notify        = lfsck_layout_master_in_notify,
5842         .lfsck_query            = lfsck_layout_query,
5843         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
5844 };
5845
5846 static struct lfsck_operations lfsck_layout_slave_ops = {
5847         .lfsck_reset            = lfsck_layout_reset,
5848         .lfsck_fail             = lfsck_layout_fail,
5849         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
5850         .lfsck_prep             = lfsck_layout_slave_prep,
5851         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
5852         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5853         .lfsck_post             = lfsck_layout_slave_post,
5854         .lfsck_dump             = lfsck_layout_dump,
5855         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
5856         .lfsck_data_release     = lfsck_layout_slave_data_release,
5857         .lfsck_quit             = lfsck_layout_slave_quit,
5858         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
5859         .lfsck_query            = lfsck_layout_query,
5860         .lfsck_join             = lfsck_layout_slave_join,
5861 };
5862
5863 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5864 {
5865         struct lfsck_component  *com;
5866         struct lfsck_layout     *lo;
5867         struct dt_object        *root = NULL;
5868         struct dt_object        *obj;
5869         int                      rc;
5870         ENTRY;
5871
5872         OBD_ALLOC_PTR(com);
5873         if (com == NULL)
5874                 RETURN(-ENOMEM);
5875
5876         INIT_LIST_HEAD(&com->lc_link);
5877         INIT_LIST_HEAD(&com->lc_link_dir);
5878         init_rwsem(&com->lc_sem);
5879         atomic_set(&com->lc_ref, 1);
5880         com->lc_lfsck = lfsck;
5881         com->lc_type = LFSCK_TYPE_LAYOUT;
5882         if (lfsck->li_master) {
5883                 struct lfsck_layout_master_data *llmd;
5884
5885                 com->lc_ops = &lfsck_layout_master_ops;
5886                 OBD_ALLOC_PTR(llmd);
5887                 if (llmd == NULL)
5888                         GOTO(out, rc = -ENOMEM);
5889
5890                 INIT_LIST_HEAD(&llmd->llmd_req_list);
5891                 spin_lock_init(&llmd->llmd_lock);
5892                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
5893                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
5894                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
5895                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
5896                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
5897                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
5898                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
5899                 com->lc_data = llmd;
5900         } else {
5901                 struct lfsck_layout_slave_data *llsd;
5902
5903                 com->lc_ops = &lfsck_layout_slave_ops;
5904                 OBD_ALLOC_PTR(llsd);
5905                 if (llsd == NULL)
5906                         GOTO(out, rc = -ENOMEM);
5907
5908                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5909                 INIT_LIST_HEAD(&llsd->llsd_master_list);
5910                 spin_lock_init(&llsd->llsd_lock);
5911                 llsd->llsd_rb_root = RB_ROOT;
5912                 rwlock_init(&llsd->llsd_rb_lock);
5913                 com->lc_data = llsd;
5914         }
5915         com->lc_file_size = sizeof(*lo);
5916         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5917         if (com->lc_file_ram == NULL)
5918                 GOTO(out, rc = -ENOMEM);
5919
5920         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5921         if (com->lc_file_disk == NULL)
5922                 GOTO(out, rc = -ENOMEM);
5923
5924         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5925         if (IS_ERR(root))
5926                 GOTO(out, rc = PTR_ERR(root));
5927
5928         if (unlikely(!dt_try_as_dir(env, root)))
5929                 GOTO(out, rc = -ENOTDIR);
5930
5931         obj = local_file_find_or_create(env, lfsck->li_los, root,
5932                                         lfsck_layout_name,
5933                                         S_IFREG | S_IRUGO | S_IWUSR);
5934         if (IS_ERR(obj))
5935                 GOTO(out, rc = PTR_ERR(obj));
5936
5937         com->lc_obj = obj;
5938         rc = lfsck_layout_load(env, com);
5939         if (rc > 0)
5940                 rc = lfsck_layout_reset(env, com, true);
5941         else if (rc == -ENOENT)
5942                 rc = lfsck_layout_init(env, com);
5943
5944         if (rc != 0)
5945                 GOTO(out, rc);
5946
5947         lo = com->lc_file_ram;
5948         switch (lo->ll_status) {
5949         case LS_INIT:
5950         case LS_COMPLETED:
5951         case LS_FAILED:
5952         case LS_STOPPED:
5953         case LS_PARTIAL:
5954                 spin_lock(&lfsck->li_lock);
5955                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5956                 spin_unlock(&lfsck->li_lock);
5957                 break;
5958         default:
5959                 CERROR("%s: unknown lfsck_layout status %d\n",
5960                        lfsck_lfsck2name(lfsck), lo->ll_status);
5961                 /* fall through */
5962         case LS_SCANNING_PHASE1:
5963         case LS_SCANNING_PHASE2:
5964                 /* No need to store the status to disk right now.
5965                  * If the system crashed before the status stored,
5966                  * it will be loaded back when next time. */
5967                 lo->ll_status = LS_CRASHED;
5968                 lo->ll_flags |= LF_INCOMPLETE;
5969                 /* fall through */
5970         case LS_PAUSED:
5971         case LS_CRASHED:
5972         case LS_CO_FAILED:
5973         case LS_CO_STOPPED:
5974         case LS_CO_PAUSED:
5975                 spin_lock(&lfsck->li_lock);
5976                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5977                 spin_unlock(&lfsck->li_lock);
5978                 break;
5979         }
5980
5981         if (lo->ll_flags & LF_CRASHED_LASTID) {
5982                 LASSERT(lfsck->li_out_notify != NULL);
5983
5984                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5985                                      LE_LASTID_REBUILDING);
5986         }
5987
5988         GOTO(out, rc = 0);
5989
5990 out:
5991         if (root != NULL && !IS_ERR(root))
5992                 lu_object_put(env, &root->do_lu);
5993
5994         if (rc != 0) {
5995                 lfsck_component_cleanup(env, com);
5996                 CERROR("%s: fail to init layout LFSCK component: rc = %d\n",
5997                        lfsck_lfsck2name(lfsck), rc);
5998         }
5999
6000         return rc;
6001 }
6002
6003 struct lfsck_orphan_it {
6004         struct lfsck_component           *loi_com;
6005         struct lfsck_rbtree_node         *loi_lrn;
6006         struct lfsck_layout_slave_target *loi_llst;
6007         struct lu_fid                     loi_key;
6008         struct lu_orphan_rec              loi_rec;
6009         __u64                             loi_hash;
6010         unsigned int                      loi_over:1;
6011 };
6012
6013 static int lfsck_fid_match_idx(const struct lu_env *env,
6014                                struct lfsck_instance *lfsck,
6015                                const struct lu_fid *fid, int idx)
6016 {
6017         struct seq_server_site  *ss;
6018         struct lu_server_fld    *sf;
6019         struct lu_seq_range      range  = { 0 };
6020         int                      rc;
6021
6022         /* All abnormal cases will be returned to MDT0. */
6023         if (!fid_is_norm(fid)) {
6024                 if (idx == 0)
6025                         return 1;
6026
6027                 return 0;
6028         }
6029
6030         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
6031         if (unlikely(ss == NULL))
6032                 return -ENOTCONN;
6033
6034         sf = ss->ss_server_fld;
6035         LASSERT(sf != NULL);
6036
6037         fld_range_set_any(&range);
6038         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
6039         if (rc != 0)
6040                 return rc;
6041
6042         if (!fld_range_is_mdt(&range))
6043                 return -EINVAL;
6044
6045         if (range.lsr_index == idx)
6046                 return 1;
6047
6048         return 0;
6049 }
6050
6051 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
6052                                         struct dt_device *dev,
6053                                         struct dt_object *obj)
6054 {
6055         struct thandle *handle;
6056         int             rc;
6057         ENTRY;
6058
6059         handle = dt_trans_create(env, dev);
6060         if (IS_ERR(handle))
6061                 RETURN_EXIT;
6062
6063         rc = dt_declare_ref_del(env, obj, handle);
6064         if (rc != 0)
6065                 GOTO(stop, rc);
6066
6067         rc = dt_declare_destroy(env, obj, handle);
6068         if (rc != 0)
6069                 GOTO(stop, rc);
6070
6071         rc = dt_trans_start_local(env, dev, handle);
6072         if (rc != 0)
6073                 GOTO(stop, rc);
6074
6075         dt_write_lock(env, obj, 0);
6076         rc = dt_ref_del(env, obj, handle);
6077         if (rc == 0)
6078                 rc = dt_destroy(env, obj, handle);
6079         dt_write_unlock(env, obj);
6080
6081         GOTO(stop, rc);
6082
6083 stop:
6084         dt_trans_stop(env, dev, handle);
6085
6086         CDEBUG(D_LFSCK, "destroy orphan OST-object "DFID": rc = %d\n",
6087                PFID(lfsck_dto2fid(obj)), rc);
6088
6089         RETURN_EXIT;
6090 }
6091
6092 static int lfsck_orphan_index_lookup(const struct lu_env *env,
6093                                      struct dt_object *dt,
6094                                      struct dt_rec *rec,
6095                                      const struct dt_key *key,
6096                                      struct lustre_capa *capa)
6097 {
6098         return -EOPNOTSUPP;
6099 }
6100
6101 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
6102                                              struct dt_object *dt,
6103                                              const struct dt_rec *rec,
6104                                              const struct dt_key *key,
6105                                              struct thandle *handle)
6106 {
6107         return -EOPNOTSUPP;
6108 }
6109
6110 static int lfsck_orphan_index_insert(const struct lu_env *env,
6111                                      struct dt_object *dt,
6112                                      const struct dt_rec *rec,
6113                                      const struct dt_key *key,
6114                                      struct thandle *handle,
6115                                      struct lustre_capa *capa,
6116                                      int ignore_quota)
6117 {
6118         return -EOPNOTSUPP;
6119 }
6120
6121 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
6122                                              struct dt_object *dt,
6123                                              const struct dt_key *key,
6124                                              struct thandle *handle)
6125 {
6126         return -EOPNOTSUPP;
6127 }
6128
6129 static int lfsck_orphan_index_delete(const struct lu_env *env,
6130                                      struct dt_object *dt,
6131                                      const struct dt_key *key,
6132                                      struct thandle *handle,
6133                                      struct lustre_capa *capa)
6134 {
6135         return -EOPNOTSUPP;
6136 }
6137
6138 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
6139                                           struct dt_object *dt,
6140                                           __u32 attr,
6141                                           struct lustre_capa *capa)
6142 {
6143         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
6144         struct lfsck_instance           *lfsck;
6145         struct lfsck_component          *com    = NULL;
6146         struct lfsck_layout_slave_data  *llsd;
6147         struct lfsck_orphan_it          *it     = NULL;
6148         int                              rc     = 0;
6149         ENTRY;
6150
6151         lfsck = lfsck_instance_find(dev, true, false);
6152         if (unlikely(lfsck == NULL))
6153                 RETURN(ERR_PTR(-ENXIO));
6154
6155         com = lfsck_component_find(lfsck, LFSCK_TYPE_LAYOUT);
6156         if (unlikely(com == NULL))
6157                 GOTO(out, rc = -ENOENT);
6158
6159         llsd = com->lc_data;
6160         if (!llsd->llsd_rbtree_valid)
6161                 GOTO(out, rc = -ESRCH);
6162
6163         OBD_ALLOC_PTR(it);
6164         if (it == NULL)
6165                 GOTO(out, rc = -ENOMEM);
6166
6167         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
6168         if (it->loi_llst == NULL)
6169                 GOTO(out, rc = -ENXIO);
6170
6171         if (dev->dd_record_fid_accessed) {
6172                 /* The first iteration against the rbtree, scan the whole rbtree
6173                  * to remove the nodes which do NOT need to be handled. */
6174                 write_lock(&llsd->llsd_rb_lock);
6175                 if (dev->dd_record_fid_accessed) {
6176                         struct rb_node                  *node;
6177                         struct rb_node                  *next;
6178                         struct lfsck_rbtree_node        *lrn;
6179
6180                         /* No need to record the fid accessing anymore. */
6181                         dev->dd_record_fid_accessed = 0;
6182
6183                         node = rb_first(&llsd->llsd_rb_root);
6184                         while (node != NULL) {
6185                                 next = rb_next(node);
6186                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
6187                                                lrn_node);
6188                                 if (atomic_read(&lrn->lrn_known_count) <=
6189                                     atomic_read(&lrn->lrn_accessed_count)) {
6190                                         rb_erase(node, &llsd->llsd_rb_root);
6191                                         lfsck_rbtree_free(lrn);
6192                                 }
6193                                 node = next;
6194                         }
6195                 }
6196                 write_unlock(&llsd->llsd_rb_lock);
6197         }
6198
6199         /* read lock the rbtree when init, and unlock when fini */
6200         read_lock(&llsd->llsd_rb_lock);
6201         it->loi_com = com;
6202         com = NULL;
6203
6204         GOTO(out, rc = 0);
6205
6206 out:
6207         if (com != NULL)
6208                 lfsck_component_put(env, com);
6209
6210         CDEBUG(D_LFSCK, "%s: init the orphan iteration: rc = %d\n",
6211                lfsck_lfsck2name(lfsck), rc);
6212
6213         lfsck_instance_put(env, lfsck);
6214         if (rc != 0) {
6215                 if (it != NULL)
6216                         OBD_FREE_PTR(it);
6217
6218                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
6219         }
6220
6221         return (struct dt_it *)it;
6222 }
6223
6224 static void lfsck_orphan_it_fini(const struct lu_env *env,
6225                                  struct dt_it *di)
6226 {
6227         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
6228         struct lfsck_component           *com   = it->loi_com;
6229         struct lfsck_layout_slave_data   *llsd;
6230         struct lfsck_layout_slave_target *llst;
6231
6232         if (com != NULL) {
6233                 CDEBUG(D_LFSCK, "%s: fini the orphan iteration\n",
6234                        lfsck_lfsck2name(com->lc_lfsck));
6235
6236                 llsd = com->lc_data;
6237                 read_unlock(&llsd->llsd_rb_lock);
6238                 llst = it->loi_llst;
6239                 LASSERT(llst != NULL);
6240
6241                 /* Save the key and hash for iterate next. */
6242                 llst->llst_fid = it->loi_key;
6243                 llst->llst_hash = it->loi_hash;
6244                 lfsck_layout_llst_put(llst);
6245                 lfsck_component_put(env, com);
6246         }
6247         OBD_FREE_PTR(it);
6248 }
6249
6250 /**
6251  * \retval       +1: the iteration finished
6252  * \retval        0: on success, not finished
6253  * \retval      -ve: on error
6254  */
6255 static int lfsck_orphan_it_next(const struct lu_env *env,
6256                                 struct dt_it *di)
6257 {
6258         struct lfsck_thread_info        *info   = lfsck_env_info(env);
6259         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
6260         struct lu_attr                  *la     = &info->lti_la;
6261         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
6262         struct lu_fid                   *key    = &it->loi_key;
6263         struct lu_orphan_rec            *rec    = &it->loi_rec;
6264         struct lfsck_component          *com    = it->loi_com;
6265         struct lfsck_instance           *lfsck  = com->lc_lfsck;
6266         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
6267         struct dt_object                *obj;
6268         struct lfsck_rbtree_node        *lrn;
6269         int                              pos;
6270         int                              rc;
6271         __u32                            save;
6272         __u32                            idx    = it->loi_llst->llst_index;
6273         bool                             exact  = false;
6274         ENTRY;
6275
6276         if (it->loi_over)
6277                 RETURN(1);
6278
6279 again0:
6280         lrn = it->loi_lrn;
6281         if (lrn == NULL) {
6282                 lrn = lfsck_rbtree_search(llsd, key, &exact);
6283                 if (lrn == NULL) {
6284                         it->loi_over = 1;
6285                         RETURN(1);
6286                 }
6287
6288                 it->loi_lrn = lrn;
6289                 if (!exact) {
6290                         key->f_seq = lrn->lrn_seq;
6291                         key->f_oid = lrn->lrn_first_oid;
6292                         key->f_ver = 0;
6293                 }
6294         } else {
6295                 key->f_oid++;
6296                 if (unlikely(key->f_oid == 0)) {
6297                         key->f_seq++;
6298                         it->loi_lrn = NULL;
6299                         goto again0;
6300                 }
6301
6302                 if (key->f_oid >=
6303                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
6304                         it->loi_lrn = NULL;
6305                         goto again0;
6306                 }
6307         }
6308
6309         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
6310                      atomic_read(&lrn->lrn_accessed_count))) {
6311                 struct rb_node *next = rb_next(&lrn->lrn_node);
6312
6313                 while (next != NULL) {
6314                         lrn = rb_entry(next, struct lfsck_rbtree_node,
6315                                        lrn_node);
6316                         if (atomic_read(&lrn->lrn_known_count) >
6317                             atomic_read(&lrn->lrn_accessed_count))
6318                                 break;
6319                         next = rb_next(next);
6320                 }
6321
6322                 if (next == NULL) {
6323                         it->loi_over = 1;
6324                         RETURN(1);
6325                 }
6326
6327                 it->loi_lrn = lrn;
6328                 key->f_seq = lrn->lrn_seq;
6329                 key->f_oid = lrn->lrn_first_oid;
6330                 key->f_ver = 0;
6331         }
6332
6333         pos = key->f_oid - lrn->lrn_first_oid;
6334
6335 again1:
6336         pos = find_next_bit(lrn->lrn_known_bitmap,
6337                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
6338         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
6339                 key->f_oid = lrn->lrn_first_oid + pos;
6340                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
6341                         key->f_seq++;
6342                         key->f_oid = 0;
6343                 }
6344                 it->loi_lrn = NULL;
6345                 goto again0;
6346         }
6347
6348         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
6349                 pos++;
6350                 goto again1;
6351         }
6352
6353         key->f_oid = lrn->lrn_first_oid + pos;
6354         obj = lfsck_object_find(env, lfsck, key);
6355         if (IS_ERR(obj)) {
6356                 rc = PTR_ERR(obj);
6357                 if (rc == -ENOENT) {
6358                         pos++;
6359                         goto again1;
6360                 }
6361                 RETURN(rc);
6362         }
6363
6364         dt_read_lock(env, obj, 0);
6365         if (dt_object_exists(obj) == 0 ||
6366             lfsck_is_dead_obj(obj)) {
6367                 dt_read_unlock(env, obj);
6368                 lfsck_object_put(env, obj);
6369                 pos++;
6370                 goto again1;
6371         }
6372
6373         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
6374         if (rc != 0)
6375                 GOTO(out, rc);
6376
6377         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
6378                           XATTR_NAME_FID, BYPASS_CAPA);
6379         if (rc == -ENODATA) {
6380                 /* For the pre-created OST-object, update the bitmap to avoid
6381                  * others LFSCK (second phase) iteration to touch it again. */
6382                 if (la->la_ctime == 0) {
6383                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
6384                                 atomic_inc(&lrn->lrn_accessed_count);
6385
6386                         /* For the race between repairing dangling referenced
6387                          * MDT-object and unlink the file, it may left orphan
6388                          * OST-object there. Destroy it now! */
6389                         if (unlikely(!(la->la_mode & S_ISUID))) {
6390                                 dt_read_unlock(env, obj);
6391                                 lfsck_layout_destroy_orphan(env,
6392                                                             lfsck->li_bottom,
6393                                                             obj);
6394                                 lfsck_object_put(env, obj);
6395                                 pos++;
6396                                 goto again1;
6397                         }
6398                 } else if (idx == 0) {
6399                         /* If the orphan OST-object has no parent information,
6400                          * regard it as referenced by the MDT-object on MDT0. */
6401                         fid_zero(&rec->lor_fid);
6402                         rec->lor_uid = la->la_uid;
6403                         rec->lor_gid = la->la_gid;
6404                         GOTO(out, rc = 0);
6405                 }
6406
6407                 dt_read_unlock(env, obj);
6408                 lfsck_object_put(env, obj);
6409                 pos++;
6410                 goto again1;
6411         }
6412
6413         if (rc < 0)
6414                 GOTO(out, rc);
6415
6416         if (rc != sizeof(struct filter_fid) &&
6417             rc != sizeof(struct filter_fid_old))
6418                 GOTO(out, rc = -EINVAL);
6419
6420         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6421         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6422          * MDT-object's FID::f_ver, instead it is the OST-object index in its
6423          * parent MDT-object's layout EA. */
6424         save = rec->lor_fid.f_stripe_idx;
6425         rec->lor_fid.f_ver = 0;
6426         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6427         /* If the orphan OST-object does not claim the MDT, then next.
6428          *
6429          * If we do not know whether it matches or not, then return it
6430          * to the MDT for further check. */
6431         if (rc == 0) {
6432                 dt_read_unlock(env, obj);
6433                 lfsck_object_put(env, obj);
6434                 pos++;
6435                 goto again1;
6436         }
6437
6438         rec->lor_fid.f_stripe_idx = save;
6439         rec->lor_uid = la->la_uid;
6440         rec->lor_gid = la->la_gid;
6441
6442         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6443                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6444                rec->lor_uid, rec->lor_gid);
6445
6446         GOTO(out, rc = 0);
6447
6448 out:
6449         dt_read_unlock(env, obj);
6450         lfsck_object_put(env, obj);
6451         if (rc == 0)
6452                 it->loi_hash++;
6453
6454         return rc;
6455 }
6456
6457 /**
6458  * \retval       +1: locate to the exactly position
6459  * \retval        0: cannot locate to the exactly position,
6460  *                   call next() to move to a valid position.
6461  * \retval      -ve: on error
6462  */
6463 static int lfsck_orphan_it_get(const struct lu_env *env,
6464                                struct dt_it *di,
6465                                const struct dt_key *key)
6466 {
6467         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6468         int                      rc;
6469
6470         it->loi_key = *(struct lu_fid *)key;
6471         rc = lfsck_orphan_it_next(env, di);
6472         if (rc == 1)
6473                 return 0;
6474
6475         if (rc == 0)
6476                 return 1;
6477
6478         return rc;
6479 }
6480
6481 static void lfsck_orphan_it_put(const struct lu_env *env,
6482                                 struct dt_it *di)
6483 {
6484 }
6485
6486 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6487                                           const struct dt_it *di)
6488 {
6489         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6490
6491         return (struct dt_key *)&it->loi_key;
6492 }
6493
6494 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6495                                     const struct dt_it *di)
6496 {
6497         return sizeof(struct lu_fid);
6498 }
6499
6500 static int lfsck_orphan_it_rec(const struct lu_env *env,
6501                                const struct dt_it *di,
6502                                struct dt_rec *rec,
6503                                __u32 attr)
6504 {
6505         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6506
6507         *(struct lu_orphan_rec *)rec = it->loi_rec;
6508
6509         return 0;
6510 }
6511
6512 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6513                                    const struct dt_it *di)
6514 {
6515         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6516
6517         return it->loi_hash;
6518 }
6519
6520 /**
6521  * \retval       +1: locate to the exactly position
6522  * \retval        0: cannot locate to the exactly position,
6523  *                   call next() to move to a valid position.
6524  * \retval      -ve: on error
6525  */
6526 static int lfsck_orphan_it_load(const struct lu_env *env,
6527                                 const struct dt_it *di,
6528                                 __u64 hash)
6529 {
6530         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
6531         struct lfsck_layout_slave_target *llst = it->loi_llst;
6532         int                               rc;
6533
6534         LASSERT(llst != NULL);
6535
6536         if (hash != llst->llst_hash) {
6537                 CDEBUG(D_LFSCK, "%s: the given hash "LPU64" for orphan "
6538                        "iteration does not match the one when fini "
6539                        LPU64", to be reset.\n",
6540                        lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6541                        llst->llst_hash);
6542                 fid_zero(&llst->llst_fid);
6543                 llst->llst_hash = 0;
6544         }
6545
6546         it->loi_key = llst->llst_fid;
6547         it->loi_hash = llst->llst_hash;
6548         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6549         if (rc == 1)
6550                 return 0;
6551
6552         if (rc == 0)
6553                 return 1;
6554
6555         return rc;
6556 }
6557
6558 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6559                                    const struct dt_it *di,
6560                                    void *key_rec)
6561 {
6562         return 0;
6563 }
6564
6565 const struct dt_index_operations lfsck_orphan_index_ops = {
6566         .dio_lookup             = lfsck_orphan_index_lookup,
6567         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
6568         .dio_insert             = lfsck_orphan_index_insert,
6569         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
6570         .dio_delete             = lfsck_orphan_index_delete,
6571         .dio_it = {
6572                 .init           = lfsck_orphan_it_init,
6573                 .fini           = lfsck_orphan_it_fini,
6574                 .get            = lfsck_orphan_it_get,
6575                 .put            = lfsck_orphan_it_put,
6576                 .next           = lfsck_orphan_it_next,
6577                 .key            = lfsck_orphan_it_key,
6578                 .key_size       = lfsck_orphan_it_key_size,
6579                 .rec            = lfsck_orphan_it_rec,
6580                 .store          = lfsck_orphan_it_store,
6581                 .load           = lfsck_orphan_it_load,
6582                 .key_rec        = lfsck_orphan_it_key_rec,
6583         }
6584 };