Whamcloud - gitweb
3db63dc87f627aab09a5c647997811bfe1c39f88
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         /* The position for next record in the rbtree for iteration. */
68         struct lu_fid           llst_fid;
69         /* Dummy hash for iteration against the rbtree. */
70         __u64                   llst_hash;
71         __u64                   llst_gen;
72         atomic_t                llst_ref;
73         __u32                   llst_index;
74 };
75
76 struct lfsck_layout_slave_data {
77         /* list for lfsck_layout_seq */
78         struct list_head         llsd_seq_list;
79
80         /* list for the masters involve layout verification. */
81         struct list_head         llsd_master_list;
82         spinlock_t               llsd_lock;
83         __u64                    llsd_touch_gen;
84         struct dt_object        *llsd_rb_obj;
85         struct rb_root           llsd_rb_root;
86         rwlock_t                 llsd_rb_lock;
87         unsigned int             llsd_rbtree_valid:1;
88 };
89
90 struct lfsck_layout_object {
91         struct dt_object        *llo_obj;
92         struct lu_attr           llo_attr;
93         atomic_t                 llo_ref;
94         __u16                    llo_gen;
95 };
96
97 struct lfsck_layout_req {
98         struct list_head                 llr_list;
99         struct lfsck_layout_object      *llr_parent;
100         struct dt_object                *llr_child;
101         __u32                            llr_ost_idx;
102         __u32                            llr_lov_idx; /* offset in LOV EA */
103 };
104
105 struct lfsck_layout_master_data {
106         spinlock_t              llmd_lock;
107         struct list_head        llmd_req_list;
108
109         /* list for the ost targets involve layout verification. */
110         struct list_head        llmd_ost_list;
111
112         /* list for the ost targets in phase1 scanning. */
113         struct list_head        llmd_ost_phase1_list;
114
115         /* list for the ost targets in phase1 scanning. */
116         struct list_head        llmd_ost_phase2_list;
117
118         /* list for the mdt targets involve layout verification. */
119         struct list_head        llmd_mdt_list;
120
121         /* list for the mdt targets in phase1 scanning. */
122         struct list_head        llmd_mdt_phase1_list;
123
124         /* list for the mdt targets in phase1 scanning. */
125         struct list_head        llmd_mdt_phase2_list;
126
127         struct ptlrpc_thread    llmd_thread;
128         __u32                   llmd_touch_gen;
129         int                     llmd_prefetched;
130         int                     llmd_assistant_status;
131         int                     llmd_post_result;
132         unsigned int            llmd_to_post:1,
133                                 llmd_to_double_scan:1,
134                                 llmd_in_double_scan:1,
135                                 llmd_exit:1;
136 };
137
138 struct lfsck_layout_slave_async_args {
139         struct obd_export                *llsaa_exp;
140         struct lfsck_component           *llsaa_com;
141         struct lfsck_layout_slave_target *llsaa_llst;
142 };
143
144 static struct lfsck_layout_object *
145 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
146                          __u16 gen)
147 {
148         struct lfsck_layout_object *llo;
149         int                         rc;
150
151         OBD_ALLOC_PTR(llo);
152         if (llo == NULL)
153                 return ERR_PTR(-ENOMEM);
154
155         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
156         if (rc != 0) {
157                 OBD_FREE_PTR(llo);
158
159                 return ERR_PTR(rc);
160         }
161
162         lu_object_get(&obj->do_lu);
163         llo->llo_obj = obj;
164         /* The gen can be used to check whether some others have changed the
165          * file layout after LFSCK pre-fetching but before real verification. */
166         llo->llo_gen = gen;
167         atomic_set(&llo->llo_ref, 1);
168
169         return llo;
170 }
171
172 static inline void
173 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
174 {
175         if (atomic_dec_and_test(&llst->llst_ref)) {
176                 LASSERT(list_empty(&llst->llst_list));
177
178                 OBD_FREE_PTR(llst);
179         }
180 }
181
182 static inline int
183 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
184 {
185         struct lfsck_layout_slave_target *llst;
186         struct lfsck_layout_slave_target *tmp;
187         int                               rc   = 0;
188
189         OBD_ALLOC_PTR(llst);
190         if (llst == NULL)
191                 return -ENOMEM;
192
193         INIT_LIST_HEAD(&llst->llst_list);
194         llst->llst_gen = 0;
195         llst->llst_index = index;
196         atomic_set(&llst->llst_ref, 1);
197
198         spin_lock(&llsd->llsd_lock);
199         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
200                 if (tmp->llst_index == index) {
201                         rc = -EALREADY;
202                         break;
203                 }
204         }
205         if (rc == 0)
206                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
207         spin_unlock(&llsd->llsd_lock);
208
209         if (rc != 0)
210                 OBD_FREE_PTR(llst);
211
212         return rc;
213 }
214
215 static inline void
216 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
217                       struct lfsck_layout_slave_target *llst)
218 {
219         bool del = false;
220
221         spin_lock(&llsd->llsd_lock);
222         if (!list_empty(&llst->llst_list)) {
223                 list_del_init(&llst->llst_list);
224                 del = true;
225         }
226         spin_unlock(&llsd->llsd_lock);
227
228         if (del)
229                 lfsck_layout_llst_put(llst);
230 }
231
232 static inline struct lfsck_layout_slave_target *
233 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
234                                __u32 index, bool unlink)
235 {
236         struct lfsck_layout_slave_target *llst;
237
238         spin_lock(&llsd->llsd_lock);
239         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
240                 if (llst->llst_index == index) {
241                         if (unlink)
242                                 list_del_init(&llst->llst_list);
243                         else
244                                 atomic_inc(&llst->llst_ref);
245                         spin_unlock(&llsd->llsd_lock);
246
247                         return llst;
248                 }
249         }
250         spin_unlock(&llsd->llsd_lock);
251
252         return NULL;
253 }
254
255 static inline void lfsck_layout_object_put(const struct lu_env *env,
256                                            struct lfsck_layout_object *llo)
257 {
258         if (atomic_dec_and_test(&llo->llo_ref)) {
259                 lfsck_object_put(env, llo->llo_obj);
260                 OBD_FREE_PTR(llo);
261         }
262 }
263
264 static struct lfsck_layout_req *
265 lfsck_layout_req_init(struct lfsck_layout_object *parent,
266                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
267 {
268         struct lfsck_layout_req *llr;
269
270         OBD_ALLOC_PTR(llr);
271         if (llr == NULL)
272                 return ERR_PTR(-ENOMEM);
273
274         INIT_LIST_HEAD(&llr->llr_list);
275         atomic_inc(&parent->llo_ref);
276         llr->llr_parent = parent;
277         llr->llr_child = child;
278         llr->llr_ost_idx = ost_idx;
279         llr->llr_lov_idx = lov_idx;
280
281         return llr;
282 }
283
284 static inline void lfsck_layout_req_fini(const struct lu_env *env,
285                                          struct lfsck_layout_req *llr)
286 {
287         lu_object_put(env, &llr->llr_child->do_lu);
288         lfsck_layout_object_put(env, llr->llr_parent);
289         OBD_FREE_PTR(llr);
290 }
291
292 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
293 {
294         bool empty = false;
295
296         spin_lock(&llmd->llmd_lock);
297         if (list_empty(&llmd->llmd_req_list))
298                 empty = true;
299         spin_unlock(&llmd->llmd_lock);
300
301         return empty;
302 }
303
304 static int lfsck_layout_get_lovea(const struct lu_env *env,
305                                   struct dt_object *obj, struct lu_buf *buf)
306 {
307         int rc;
308
309 again:
310         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
311         if (rc == -ERANGE) {
312                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
313                                   BYPASS_CAPA);
314                 if (rc <= 0)
315                         return rc;
316
317                 lu_buf_realloc(buf, rc);
318                 if (buf->lb_buf == NULL)
319                         return -ENOMEM;
320
321                 goto again;
322         }
323
324         if (rc == -ENODATA)
325                 rc = 0;
326
327         if (rc <= 0)
328                 return rc;
329
330         if (unlikely(buf->lb_buf == NULL)) {
331                 lu_buf_alloc(buf, rc);
332                 if (buf->lb_buf == NULL)
333                         return -ENOMEM;
334
335                 goto again;
336         }
337
338         return rc;
339 }
340
341 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
342 {
343         __u32 magic;
344         __u32 pattern;
345
346         magic = le32_to_cpu(lmm->lmm_magic);
347         /* If magic crashed, keep it there. Sometime later, during OST-object
348          * orphan handling, if some OST-object(s) back-point to it, it can be
349          * verified and repaired. */
350         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
351                 struct ost_id   oi;
352                 int             rc;
353
354                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
355                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
356                         rc = -EOPNOTSUPP;
357                 else
358                         rc = -EINVAL;
359
360                 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
361                        rc == -EINVAL ? "Unknown" : "Unsupported",
362                        magic, POSTID(&oi));
363
364                 return rc;
365         }
366
367         pattern = le32_to_cpu(lmm->lmm_pattern);
368         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
369         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
370                 struct ost_id oi;
371
372                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
373                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
374                        pattern, POSTID(&oi));
375
376                 return -EOPNOTSUPP;
377         }
378
379         return 0;
380 }
381
382 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
383 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
384 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
385
386 struct lfsck_rbtree_node {
387         struct rb_node   lrn_node;
388         __u64            lrn_seq;
389         __u32            lrn_first_oid;
390         atomic_t         lrn_known_count;
391         atomic_t         lrn_accessed_count;
392         void            *lrn_known_bitmap;
393         void            *lrn_accessed_bitmap;
394 };
395
396 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
397                                    __u64 seq, __u32 oid)
398 {
399         if (seq < lrn->lrn_seq)
400                 return -1;
401
402         if (seq > lrn->lrn_seq)
403                 return 1;
404
405         if (oid < lrn->lrn_first_oid)
406                 return -1;
407
408         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
409                 return 1;
410
411         return 0;
412 }
413
414 /* The caller should hold llsd->llsd_rb_lock. */
415 static struct lfsck_rbtree_node *
416 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
417                     const struct lu_fid *fid, bool *exact)
418 {
419         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
420         struct rb_node           *prev  = NULL;
421         struct lfsck_rbtree_node *lrn   = NULL;
422         int                       rc    = 0;
423
424         if (exact != NULL)
425                 *exact = true;
426
427         while (node != NULL) {
428                 prev = node;
429                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
430                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
431                 if (rc < 0)
432                         node = node->rb_left;
433                 else if (rc > 0)
434                         node = node->rb_right;
435                 else
436                         return lrn;
437         }
438
439         if (exact == NULL)
440                 return NULL;
441
442         /* If there is no exactly matched one, then to the next valid one. */
443         *exact = false;
444
445         /* The rbtree is empty. */
446         if (rc == 0)
447                 return NULL;
448
449         if (rc < 0)
450                 return lrn;
451
452         node = rb_next(prev);
453
454         /* The end of the rbtree. */
455         if (node == NULL)
456                 return NULL;
457
458         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
459
460         return lrn;
461 }
462
463 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
464                                                   const struct lu_fid *fid)
465 {
466         struct lfsck_rbtree_node *lrn;
467
468         OBD_ALLOC_PTR(lrn);
469         if (lrn == NULL)
470                 return ERR_PTR(-ENOMEM);
471
472         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
473         if (lrn->lrn_known_bitmap == NULL) {
474                 OBD_FREE_PTR(lrn);
475
476                 return ERR_PTR(-ENOMEM);
477         }
478
479         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
480         if (lrn->lrn_accessed_bitmap == NULL) {
481                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
482                 OBD_FREE_PTR(lrn);
483
484                 return ERR_PTR(-ENOMEM);
485         }
486
487         RB_CLEAR_NODE(&lrn->lrn_node);
488         lrn->lrn_seq = fid_seq(fid);
489         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
490         atomic_set(&lrn->lrn_known_count, 0);
491         atomic_set(&lrn->lrn_accessed_count, 0);
492
493         return lrn;
494 }
495
496 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
497 {
498         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
499         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
500         OBD_FREE_PTR(lrn);
501 }
502
503 /* The caller should hold lock. */
504 static struct lfsck_rbtree_node *
505 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
506                     struct lfsck_rbtree_node *lrn)
507 {
508         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
509         struct rb_node            *parent = NULL;
510         struct lfsck_rbtree_node  *tmp;
511         int                        rc;
512
513         while (*pos != NULL) {
514                 parent = *pos;
515                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
516                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
517                 if (rc < 0)
518                         pos = &(*pos)->rb_left;
519                 else if (rc > 0)
520                         pos = &(*pos)->rb_right;
521                 else
522                         return tmp;
523         }
524
525         rb_link_node(&lrn->lrn_node, parent, pos);
526         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
527
528         return lrn;
529 }
530
531 extern const struct dt_index_operations lfsck_orphan_index_ops;
532
533 static int lfsck_rbtree_setup(const struct lu_env *env,
534                               struct lfsck_component *com)
535 {
536         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
537         struct lfsck_instance           *lfsck  = com->lc_lfsck;
538         struct dt_device                *dev    = lfsck->li_bottom;
539         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
540         struct dt_object                *obj;
541
542         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
543         fid->f_oid = lfsck_dev_idx(dev);
544         fid->f_ver = 0;
545         obj = dt_locate(env, dev, fid);
546         if (IS_ERR(obj))
547                 RETURN(PTR_ERR(obj));
548
549         /* Generate an in-RAM object to stand for the layout rbtree.
550          * Scanning the layout rbtree will be via the iteration over
551          * the object. In the future, the rbtree may be written onto
552          * disk with the object.
553          *
554          * Mark the object to be as exist. */
555         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
556         obj->do_index_ops = &lfsck_orphan_index_ops;
557         llsd->llsd_rb_obj = obj;
558         llsd->llsd_rbtree_valid = 1;
559         dev->dd_record_fid_accessed = 1;
560
561         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
562                lfsck_lfsck2name(lfsck));
563
564         return 0;
565 }
566
567 static void lfsck_rbtree_cleanup(const struct lu_env *env,
568                                  struct lfsck_component *com)
569 {
570         struct lfsck_instance           *lfsck = com->lc_lfsck;
571         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
572         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
573         struct rb_node                  *next;
574         struct lfsck_rbtree_node        *lrn;
575
576         lfsck->li_bottom->dd_record_fid_accessed = 0;
577         /* Invalid the rbtree, then no others will use it. */
578         write_lock(&llsd->llsd_rb_lock);
579         llsd->llsd_rbtree_valid = 0;
580         write_unlock(&llsd->llsd_rb_lock);
581
582         while (node != NULL) {
583                 next = rb_next(node);
584                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
585                 rb_erase(node, &llsd->llsd_rb_root);
586                 lfsck_rbtree_free(lrn);
587                 node = next;
588         }
589
590         if (llsd->llsd_rb_obj != NULL) {
591                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
592                 llsd->llsd_rb_obj = NULL;
593         }
594
595         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
596                lfsck_lfsck2name(lfsck));
597 }
598
599 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
600                                        struct lfsck_component *com,
601                                        const struct lu_fid *fid,
602                                        bool accessed)
603 {
604         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
605         struct lfsck_rbtree_node        *lrn;
606         bool                             insert = false;
607         int                              idx;
608         int                              rc     = 0;
609         ENTRY;
610
611         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
612                 RETURN_EXIT;
613
614         if (!fid_is_idif(fid) && !fid_is_norm(fid))
615                 RETURN_EXIT;
616
617         read_lock(&llsd->llsd_rb_lock);
618         if (!llsd->llsd_rbtree_valid)
619                 GOTO(unlock, rc = 0);
620
621         lrn = lfsck_rbtree_search(llsd, fid, NULL);
622         if (lrn == NULL) {
623                 struct lfsck_rbtree_node *tmp;
624
625                 LASSERT(!insert);
626
627                 read_unlock(&llsd->llsd_rb_lock);
628                 tmp = lfsck_rbtree_new(env, fid);
629                 if (IS_ERR(tmp))
630                         GOTO(out, rc = PTR_ERR(tmp));
631
632                 insert = true;
633                 write_lock(&llsd->llsd_rb_lock);
634                 if (!llsd->llsd_rbtree_valid) {
635                         lfsck_rbtree_free(tmp);
636                         GOTO(unlock, rc = 0);
637                 }
638
639                 lrn = lfsck_rbtree_insert(llsd, tmp);
640                 if (lrn != tmp)
641                         lfsck_rbtree_free(tmp);
642         }
643
644         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
645         /* Any accessed object must be a known object. */
646         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
647                 atomic_inc(&lrn->lrn_known_count);
648         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
649                 atomic_inc(&lrn->lrn_accessed_count);
650
651         GOTO(unlock, rc = 0);
652
653 unlock:
654         if (insert)
655                 write_unlock(&llsd->llsd_rb_lock);
656         else
657                 read_unlock(&llsd->llsd_rb_lock);
658 out:
659         if (rc != 0 && accessed) {
660                 struct lfsck_layout *lo = com->lc_file_ram;
661
662                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
663                        "bitmap, and will cause incorrect LFSCK OST-object "
664                        "handling, so disable it to cancel orphan handling "
665                        "for related device. rc = %d\n",
666                        lfsck_lfsck2name(com->lc_lfsck), rc);
667
668                 lo->ll_flags |= LF_INCOMPLETE;
669                 lfsck_rbtree_cleanup(env, com);
670         }
671 }
672
673 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
674                                    const struct lfsck_layout *src)
675 {
676         int i;
677
678         des->ll_magic = le32_to_cpu(src->ll_magic);
679         des->ll_status = le32_to_cpu(src->ll_status);
680         des->ll_flags = le32_to_cpu(src->ll_flags);
681         des->ll_success_count = le32_to_cpu(src->ll_success_count);
682         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
683         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
684         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
685         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
686         des->ll_time_last_checkpoint =
687                                 le64_to_cpu(src->ll_time_last_checkpoint);
688         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
689         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
690         des->ll_pos_first_inconsistent =
691                         le64_to_cpu(src->ll_pos_first_inconsistent);
692         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
693         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
694         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
695         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
696         for (i = 0; i < LLIT_MAX; i++)
697                 des->ll_objs_repaired[i] =
698                                 le64_to_cpu(src->ll_objs_repaired[i]);
699         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
700 }
701
702 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
703                                    const struct lfsck_layout *src)
704 {
705         int i;
706
707         des->ll_magic = cpu_to_le32(src->ll_magic);
708         des->ll_status = cpu_to_le32(src->ll_status);
709         des->ll_flags = cpu_to_le32(src->ll_flags);
710         des->ll_success_count = cpu_to_le32(src->ll_success_count);
711         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
712         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
713         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
714         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
715         des->ll_time_last_checkpoint =
716                                 cpu_to_le64(src->ll_time_last_checkpoint);
717         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
718         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
719         des->ll_pos_first_inconsistent =
720                         cpu_to_le64(src->ll_pos_first_inconsistent);
721         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
722         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
723         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
724         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
725         for (i = 0; i < LLIT_MAX; i++)
726                 des->ll_objs_repaired[i] =
727                                 cpu_to_le64(src->ll_objs_repaired[i]);
728         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
729 }
730
731 /**
732  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
733  * \retval 0: succeed.
734  * \retval -ve: failed cases.
735  */
736 static int lfsck_layout_load(const struct lu_env *env,
737                              struct lfsck_component *com)
738 {
739         struct lfsck_layout             *lo     = com->lc_file_ram;
740         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
741         ssize_t                          size   = com->lc_file_size;
742         loff_t                           pos    = 0;
743         int                              rc;
744
745         rc = dbo->dbo_read(env, com->lc_obj,
746                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
747                            BYPASS_CAPA);
748         if (rc == 0) {
749                 return -ENOENT;
750         } else if (rc < 0) {
751                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
752                        lfsck_lfsck2name(com->lc_lfsck), rc);
753                 return rc;
754         } else if (rc != size) {
755                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
756                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
757                 return 1;
758         }
759
760         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
761         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
762                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
763                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
764                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
765                 return 1;
766         }
767
768         return 0;
769 }
770
771 static int lfsck_layout_store(const struct lu_env *env,
772                               struct lfsck_component *com)
773 {
774         struct dt_object         *obj           = com->lc_obj;
775         struct lfsck_instance    *lfsck         = com->lc_lfsck;
776         struct lfsck_layout      *lo            = com->lc_file_disk;
777         struct thandle           *handle;
778         ssize_t                   size          = com->lc_file_size;
779         loff_t                    pos           = 0;
780         int                       rc;
781         ENTRY;
782
783         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
784         handle = dt_trans_create(env, lfsck->li_bottom);
785         if (IS_ERR(handle))
786                 GOTO(log, rc = PTR_ERR(handle));
787
788         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
789                                      pos, handle);
790         if (rc != 0)
791                 GOTO(out, rc);
792
793         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
794         if (rc != 0)
795                 GOTO(out, rc);
796
797         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
798                              handle);
799
800         GOTO(out, rc);
801
802 out:
803         dt_trans_stop(env, lfsck->li_bottom, handle);
804
805 log:
806         if (rc != 0)
807                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
808                        lfsck_lfsck2name(lfsck), rc);
809         return rc;
810 }
811
812 static int lfsck_layout_init(const struct lu_env *env,
813                              struct lfsck_component *com)
814 {
815         struct lfsck_layout *lo = com->lc_file_ram;
816         int rc;
817
818         memset(lo, 0, com->lc_file_size);
819         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
820         lo->ll_status = LS_INIT;
821         down_write(&com->lc_sem);
822         rc = lfsck_layout_store(env, com);
823         up_write(&com->lc_sem);
824
825         return rc;
826 }
827
828 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
829                              struct dt_object *obj, const struct lu_fid *fid)
830 {
831         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
832         struct lu_seq_range      range  = { 0 };
833         struct lustre_mdt_attrs *lma;
834         int                      rc;
835
836         fld_range_set_any(&range);
837         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
838         if (rc == 0) {
839                 if (fld_range_is_ost(&range))
840                         return 1;
841
842                 return 0;
843         }
844
845         lma = &lfsck_env_info(env)->lti_lma;
846         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
847                           XATTR_NAME_LMA, BYPASS_CAPA);
848         if (rc == sizeof(*lma)) {
849                 lustre_lma_swab(lma);
850
851                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
852         }
853
854         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
855
856         return rc > 0;
857 }
858
859 static struct lfsck_layout_seq *
860 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
861 {
862         struct lfsck_layout_seq *lls;
863
864         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
865                 if (lls->lls_seq == seq)
866                         return lls;
867
868                 if (lls->lls_seq > seq)
869                         return NULL;
870         }
871
872         return NULL;
873 }
874
875 static void
876 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
877                         struct lfsck_layout_seq *lls)
878 {
879         struct lfsck_layout_seq *tmp;
880         struct list_head        *pos = &llsd->llsd_seq_list;
881
882         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
883                 if (lls->lls_seq < tmp->lls_seq) {
884                         pos = &tmp->lls_list;
885                         break;
886                 }
887         }
888         list_add_tail(&lls->lls_list, pos);
889 }
890
891 static int
892 lfsck_layout_lastid_create(const struct lu_env *env,
893                            struct lfsck_instance *lfsck,
894                            struct dt_object *obj)
895 {
896         struct lfsck_thread_info *info   = lfsck_env_info(env);
897         struct lu_attr           *la     = &info->lti_la;
898         struct dt_object_format  *dof    = &info->lti_dof;
899         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
900         struct dt_device         *dt     = lfsck->li_bottom;
901         struct thandle           *th;
902         __u64                     lastid = 0;
903         loff_t                    pos    = 0;
904         int                       rc;
905         ENTRY;
906
907         if (bk->lb_param & LPF_DRYRUN)
908                 return 0;
909
910         memset(la, 0, sizeof(*la));
911         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
912         la->la_valid = LA_MODE | LA_UID | LA_GID;
913         dof->dof_type = dt_mode_to_dft(S_IFREG);
914
915         th = dt_trans_create(env, dt);
916         if (IS_ERR(th))
917                 GOTO(log, rc = PTR_ERR(th));
918
919         rc = dt_declare_create(env, obj, la, NULL, dof, th);
920         if (rc != 0)
921                 GOTO(stop, rc);
922
923         rc = dt_declare_record_write(env, obj,
924                                      lfsck_buf_get(env, &lastid,
925                                                    sizeof(lastid)),
926                                      pos, th);
927         if (rc != 0)
928                 GOTO(stop, rc);
929
930         rc = dt_trans_start_local(env, dt, th);
931         if (rc != 0)
932                 GOTO(stop, rc);
933
934         dt_write_lock(env, obj, 0);
935         if (likely(dt_object_exists(obj) == 0)) {
936                 rc = dt_create(env, obj, la, NULL, dof, th);
937                 if (rc == 0)
938                         rc = dt_record_write(env, obj,
939                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
940                                 &pos, th);
941         }
942         dt_write_unlock(env, obj);
943
944         GOTO(stop, rc);
945
946 stop:
947         dt_trans_stop(env, dt, th);
948
949 log:
950         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
951                LPX64": rc = %d\n",
952                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
953
954         return rc;
955 }
956
957 static int
958 lfsck_layout_lastid_reload(const struct lu_env *env,
959                            struct lfsck_component *com,
960                            struct lfsck_layout_seq *lls)
961 {
962         __u64   lastid;
963         loff_t  pos     = 0;
964         int     rc;
965
966         dt_read_lock(env, lls->lls_lastid_obj, 0);
967         rc = dt_record_read(env, lls->lls_lastid_obj,
968                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
969         dt_read_unlock(env, lls->lls_lastid_obj);
970         if (unlikely(rc != 0))
971                 return rc;
972
973         lastid = le64_to_cpu(lastid);
974         if (lastid < lls->lls_lastid_known) {
975                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
976                 struct lfsck_layout     *lo     = com->lc_file_ram;
977
978                 lls->lls_lastid = lls->lls_lastid_known;
979                 lls->lls_dirty = 1;
980                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
981                         LASSERT(lfsck->li_out_notify != NULL);
982
983                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
984                                              LE_LASTID_REBUILDING);
985                         lo->ll_flags |= LF_CRASHED_LASTID;
986
987                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
988                                "LAST_ID file (1) for the sequence "LPX64
989                                ", old value "LPU64", known value "LPU64"\n",
990                                lfsck_lfsck2name(lfsck), lls->lls_seq,
991                                lastid, lls->lls_lastid);
992                 }
993         } else if (lastid >= lls->lls_lastid) {
994                 lls->lls_lastid = lastid;
995                 lls->lls_dirty = 0;
996         }
997
998         return 0;
999 }
1000
1001 static int
1002 lfsck_layout_lastid_store(const struct lu_env *env,
1003                           struct lfsck_component *com)
1004 {
1005         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1006         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1007         struct dt_device                *dt     = lfsck->li_bottom;
1008         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1009         struct lfsck_layout_seq         *lls;
1010         struct thandle                  *th;
1011         __u64                            lastid;
1012         int                              rc     = 0;
1013         int                              rc1    = 0;
1014
1015         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1016                 loff_t pos = 0;
1017
1018                 if (!lls->lls_dirty)
1019                         continue;
1020
1021                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1022                        "<seq> "LPX64" as <oid> "LPU64"\n",
1023                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1024
1025                 if (bk->lb_param & LPF_DRYRUN) {
1026                         lls->lls_dirty = 0;
1027                         continue;
1028                 }
1029
1030                 th = dt_trans_create(env, dt);
1031                 if (IS_ERR(th)) {
1032                         rc1 = PTR_ERR(th);
1033                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1034                                "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1035                                lfsck_lfsck2name(com->lc_lfsck),
1036                                lls->lls_seq, rc1);
1037                         continue;
1038                 }
1039
1040                 lastid = cpu_to_le64(lls->lls_lastid);
1041                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1042                                              lfsck_buf_get(env, &lastid,
1043                                                            sizeof(lastid)),
1044                                              pos, th);
1045                 if (rc != 0)
1046                         goto stop;
1047
1048                 rc = dt_trans_start_local(env, dt, th);
1049                 if (rc != 0)
1050                         goto stop;
1051
1052                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1053                 rc = dt_record_write(env, lls->lls_lastid_obj,
1054                                      lfsck_buf_get(env, &lastid,
1055                                      sizeof(lastid)), &pos, th);
1056                 dt_write_unlock(env, lls->lls_lastid_obj);
1057                 if (rc == 0)
1058                         lls->lls_dirty = 0;
1059
1060 stop:
1061                 dt_trans_stop(env, dt, th);
1062                 if (rc != 0) {
1063                         rc1 = rc;
1064                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1065                                "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1066                                lfsck_lfsck2name(com->lc_lfsck),
1067                                lls->lls_seq, rc1);
1068                 }
1069         }
1070
1071         return rc1;
1072 }
1073
1074 static int
1075 lfsck_layout_lastid_load(const struct lu_env *env,
1076                          struct lfsck_component *com,
1077                          struct lfsck_layout_seq *lls)
1078 {
1079         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1080         struct lfsck_layout     *lo     = com->lc_file_ram;
1081         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1082         struct dt_object        *obj;
1083         loff_t                   pos    = 0;
1084         int                      rc;
1085         ENTRY;
1086
1087         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1088         obj = dt_locate(env, lfsck->li_bottom, fid);
1089         if (IS_ERR(obj))
1090                 RETURN(PTR_ERR(obj));
1091
1092         /* LAST_ID crashed, to be rebuilt */
1093         if (dt_object_exists(obj) == 0) {
1094                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1095                         LASSERT(lfsck->li_out_notify != NULL);
1096
1097                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1098                                              LE_LASTID_REBUILDING);
1099                         lo->ll_flags |= LF_CRASHED_LASTID;
1100
1101                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1102                                "LAST_ID file for sequence "LPX64"\n",
1103                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1104
1105                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1106                             cfs_fail_val > 0) {
1107                                 struct l_wait_info lwi = LWI_TIMEOUT(
1108                                                 cfs_time_seconds(cfs_fail_val),
1109                                                 NULL, NULL);
1110
1111                                 up_write(&com->lc_sem);
1112                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1113                                              !thread_is_running(&lfsck->li_thread),
1114                                              &lwi);
1115                                 down_write(&com->lc_sem);
1116                         }
1117                 }
1118
1119                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1120         } else {
1121                 dt_read_lock(env, obj, 0);
1122                 rc = dt_read(env, obj,
1123                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1124                         &pos);
1125                 dt_read_unlock(env, obj);
1126                 if (rc != 0 && rc != sizeof(__u64))
1127                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1128
1129                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1130                         LASSERT(lfsck->li_out_notify != NULL);
1131
1132                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1133                                              LE_LASTID_REBUILDING);
1134                         lo->ll_flags |= LF_CRASHED_LASTID;
1135
1136                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1137                                "LAST_ID file for the sequence "LPX64
1138                                ": rc = %d\n",
1139                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1140                 }
1141
1142                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1143                 rc = 0;
1144         }
1145
1146         GOTO(out, rc);
1147
1148 out:
1149         if (rc != 0)
1150                 lfsck_object_put(env, obj);
1151         else
1152                 lls->lls_lastid_obj = obj;
1153
1154         return rc;
1155 }
1156
1157 static void lfsck_layout_record_failure(const struct lu_env *env,
1158                                                  struct lfsck_instance *lfsck,
1159                                                  struct lfsck_layout *lo)
1160 {
1161         lo->ll_objs_failed_phase1++;
1162         if (unlikely(lo->ll_pos_first_inconsistent == 0)) {
1163                 lo->ll_pos_first_inconsistent =
1164                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1165                                                         lfsck->li_di_oit);
1166
1167                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1168                        "inconsistency at the pos ["LPU64"]\n",
1169                        lfsck_lfsck2name(lfsck),
1170                        lo->ll_pos_first_inconsistent);
1171         }
1172 }
1173
1174 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1175                                                struct ptlrpc_request *req,
1176                                                void *args, int rc)
1177 {
1178         struct lfsck_async_interpret_args *laia = args;
1179         struct lfsck_component            *com  = laia->laia_com;
1180         struct lfsck_layout_master_data   *llmd = com->lc_data;
1181         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1182         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1183         struct lfsck_request              *lr   = laia->laia_lr;
1184
1185         switch (lr->lr_event) {
1186         case LE_START:
1187                 if (rc != 0) {
1188                         struct lfsck_layout *lo = com->lc_file_ram;
1189
1190                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout "
1191                                "start: rc = %d\n",
1192                                lfsck_lfsck2name(com->lc_lfsck),
1193                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1194                                ltd->ltd_index, rc);
1195                         lo->ll_flags |= LF_INCOMPLETE;
1196                         break;
1197                 }
1198
1199                 spin_lock(&ltds->ltd_lock);
1200                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1201                         spin_unlock(&ltds->ltd_lock);
1202                         break;
1203                 }
1204
1205                 if (lr->lr_flags & LEF_TO_OST) {
1206                         if (list_empty(&ltd->ltd_layout_list))
1207                                 list_add_tail(&ltd->ltd_layout_list,
1208                                               &llmd->llmd_ost_list);
1209                         if (list_empty(&ltd->ltd_layout_phase_list))
1210                                 list_add_tail(&ltd->ltd_layout_phase_list,
1211                                               &llmd->llmd_ost_phase1_list);
1212                 } else {
1213                         if (list_empty(&ltd->ltd_layout_list))
1214                                 list_add_tail(&ltd->ltd_layout_list,
1215                                               &llmd->llmd_mdt_list);
1216                         if (list_empty(&ltd->ltd_layout_phase_list))
1217                                 list_add_tail(&ltd->ltd_layout_phase_list,
1218                                               &llmd->llmd_mdt_phase1_list);
1219                 }
1220                 spin_unlock(&ltds->ltd_lock);
1221                 break;
1222         case LE_STOP:
1223         case LE_PHASE1_DONE:
1224         case LE_PHASE2_DONE:
1225         case LE_PEER_EXIT:
1226                 if (rc != 0 && rc != -EALREADY)
1227                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout: "
1228                                "event = %d, rc = %d\n",
1229                                lfsck_lfsck2name(com->lc_lfsck),
1230                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1231                                ltd->ltd_index, lr->lr_event, rc);
1232                 break;
1233         case LE_QUERY: {
1234                 struct lfsck_reply *reply;
1235
1236                 if (rc != 0) {
1237                         spin_lock(&ltds->ltd_lock);
1238                         list_del_init(&ltd->ltd_layout_phase_list);
1239                         list_del_init(&ltd->ltd_layout_list);
1240                         spin_unlock(&ltds->ltd_lock);
1241                         break;
1242                 }
1243
1244                 reply = req_capsule_server_get(&req->rq_pill,
1245                                                &RMF_LFSCK_REPLY);
1246                 if (reply == NULL) {
1247                         rc = -EPROTO;
1248                         CDEBUG(D_LFSCK, "%s:  invalid query reply: rc = %d\n",
1249                                lfsck_lfsck2name(com->lc_lfsck), rc);
1250                         spin_lock(&ltds->ltd_lock);
1251                         list_del_init(&ltd->ltd_layout_phase_list);
1252                         list_del_init(&ltd->ltd_layout_list);
1253                         spin_unlock(&ltds->ltd_lock);
1254                         break;
1255                 }
1256
1257                 switch (reply->lr_status) {
1258                 case LS_SCANNING_PHASE1:
1259                         break;
1260                 case LS_SCANNING_PHASE2:
1261                         spin_lock(&ltds->ltd_lock);
1262                         list_del_init(&ltd->ltd_layout_phase_list);
1263                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1264                                 spin_unlock(&ltds->ltd_lock);
1265                                 break;
1266                         }
1267
1268                         if (lr->lr_flags & LEF_TO_OST)
1269                                 list_add_tail(&ltd->ltd_layout_phase_list,
1270                                               &llmd->llmd_ost_phase2_list);
1271                         else
1272                                 list_add_tail(&ltd->ltd_layout_phase_list,
1273                                               &llmd->llmd_mdt_phase2_list);
1274                         spin_unlock(&ltds->ltd_lock);
1275                         break;
1276                 default:
1277                         spin_lock(&ltds->ltd_lock);
1278                         list_del_init(&ltd->ltd_layout_phase_list);
1279                         list_del_init(&ltd->ltd_layout_list);
1280                         spin_unlock(&ltds->ltd_lock);
1281                         break;
1282                 }
1283                 break;
1284         }
1285         default:
1286                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1287                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1288                 break;
1289         }
1290
1291         if (!laia->laia_shared) {
1292                 lfsck_tgt_put(ltd);
1293                 lfsck_component_put(env, com);
1294         }
1295
1296         return 0;
1297 }
1298
1299 static int lfsck_layout_master_query_others(const struct lu_env *env,
1300                                             struct lfsck_component *com)
1301 {
1302         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1303         struct lfsck_request              *lr    = &info->lti_lr;
1304         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1305         struct lfsck_instance             *lfsck = com->lc_lfsck;
1306         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1307         struct ptlrpc_request_set         *set;
1308         struct lfsck_tgt_descs            *ltds;
1309         struct lfsck_tgt_desc             *ltd;
1310         struct list_head                  *head;
1311         int                                rc    = 0;
1312         int                                rc1   = 0;
1313         ENTRY;
1314
1315         set = ptlrpc_prep_set();
1316         if (set == NULL)
1317                 RETURN(-ENOMEM);
1318
1319         llmd->llmd_touch_gen++;
1320         memset(lr, 0, sizeof(*lr));
1321         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1322         lr->lr_event = LE_QUERY;
1323         lr->lr_active = LFSCK_TYPE_LAYOUT;
1324         laia->laia_com = com;
1325         laia->laia_lr = lr;
1326         laia->laia_shared = 0;
1327
1328         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1329                 ltds = &lfsck->li_mdt_descs;
1330                 lr->lr_flags = 0;
1331                 head = &llmd->llmd_mdt_phase1_list;
1332         } else {
1333
1334 again:
1335                 ltds = &lfsck->li_ost_descs;
1336                 lr->lr_flags = LEF_TO_OST;
1337                 head = &llmd->llmd_ost_phase1_list;
1338         }
1339
1340         laia->laia_ltds = ltds;
1341         spin_lock(&ltds->ltd_lock);
1342         while (!list_empty(head)) {
1343                 ltd = list_entry(head->next,
1344                                  struct lfsck_tgt_desc,
1345                                  ltd_layout_phase_list);
1346                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1347                         break;
1348
1349                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1350                 list_move_tail(&ltd->ltd_layout_phase_list, head);
1351                 atomic_inc(&ltd->ltd_ref);
1352                 laia->laia_ltd = ltd;
1353                 spin_unlock(&ltds->ltd_lock);
1354                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1355                                          lfsck_layout_master_async_interpret,
1356                                          laia, LFSCK_QUERY);
1357                 if (rc != 0) {
1358                         CDEBUG(D_LFSCK, "%s: layout LFSCK fail to query %s %x: "
1359                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1360                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1361                                ltd->ltd_index, rc);
1362                         lfsck_tgt_put(ltd);
1363                         rc1 = rc;
1364                 }
1365                 spin_lock(&ltds->ltd_lock);
1366         }
1367         spin_unlock(&ltds->ltd_lock);
1368
1369         rc = ptlrpc_set_wait(set);
1370         if (rc < 0) {
1371                 ptlrpc_set_destroy(set);
1372                 RETURN(rc);
1373         }
1374
1375         if (!(lr->lr_flags & LEF_TO_OST) &&
1376             list_empty(&llmd->llmd_mdt_phase1_list))
1377                 goto again;
1378
1379         ptlrpc_set_destroy(set);
1380
1381         RETURN(rc1 != 0 ? rc1 : rc);
1382 }
1383
1384 static inline bool
1385 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1386 {
1387         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1388                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1389                 list_empty(&llmd->llmd_ost_phase1_list));
1390 }
1391
1392 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1393                                              struct lfsck_component *com,
1394                                              struct lfsck_request *lr)
1395 {
1396         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1397         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1398         struct lfsck_instance             *lfsck = com->lc_lfsck;
1399         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1400         struct lfsck_layout               *lo    = com->lc_file_ram;
1401         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1402         struct ptlrpc_request_set         *set;
1403         struct lfsck_tgt_descs            *ltds;
1404         struct lfsck_tgt_desc             *ltd;
1405         struct lfsck_tgt_desc             *next;
1406         struct list_head                  *head;
1407         __u32                              idx;
1408         int                                rc    = 0;
1409         ENTRY;
1410
1411         set = ptlrpc_prep_set();
1412         if (set == NULL)
1413                 RETURN(-ENOMEM);
1414
1415         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1416         lr->lr_active = LFSCK_TYPE_LAYOUT;
1417         laia->laia_com = com;
1418         laia->laia_lr = lr;
1419         laia->laia_shared = 0;
1420         switch (lr->lr_event) {
1421         case LE_START:
1422                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1423                 ltds = &lfsck->li_ost_descs;
1424                 laia->laia_ltds = ltds;
1425                 down_read(&ltds->ltd_rw_sem);
1426                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1427                         ltd = lfsck_tgt_get(ltds, idx);
1428                         LASSERT(ltd != NULL);
1429
1430                         laia->laia_ltd = ltd;
1431                         ltd->ltd_layout_done = 0;
1432                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1433                                         lfsck_layout_master_async_interpret,
1434                                         laia, LFSCK_NOTIFY);
1435                         if (rc != 0) {
1436                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1437                                        "notify %s %x for start: rc = %d\n",
1438                                        lfsck_lfsck2name(lfsck),
1439                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1440                                        "MDT", idx, rc);
1441                                 lfsck_tgt_put(ltd);
1442                                 lo->ll_flags |= LF_INCOMPLETE;
1443                         }
1444                 }
1445                 up_read(&ltds->ltd_rw_sem);
1446
1447                 /* Sync up */
1448                 rc = ptlrpc_set_wait(set);
1449                 if (rc < 0) {
1450                         ptlrpc_set_destroy(set);
1451                         RETURN(rc);
1452                 }
1453
1454                 if (!(bk->lb_param & LPF_ALL_TGT))
1455                         break;
1456
1457                 /* link other MDT targets locallly. */
1458                 ltds = &lfsck->li_mdt_descs;
1459                 spin_lock(&ltds->ltd_lock);
1460                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1461                         ltd = LTD_TGT(ltds, idx);
1462                         LASSERT(ltd != NULL);
1463
1464                         if (!list_empty(&ltd->ltd_layout_list))
1465                                 continue;
1466
1467                         list_add_tail(&ltd->ltd_layout_list,
1468                                       &llmd->llmd_mdt_list);
1469                         list_add_tail(&ltd->ltd_layout_phase_list,
1470                                       &llmd->llmd_mdt_phase1_list);
1471                 }
1472                 spin_unlock(&ltds->ltd_lock);
1473                 break;
1474         case LE_STOP:
1475         case LE_PHASE2_DONE:
1476         case LE_PEER_EXIT: {
1477                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1478                 if (bk->lb_param & LPF_ALL_TGT) {
1479                         head = &llmd->llmd_mdt_list;
1480                         ltds = &lfsck->li_mdt_descs;
1481                         if (lr->lr_event == LE_STOP) {
1482                                 /* unlink other MDT targets locallly. */
1483                                 spin_lock(&ltds->ltd_lock);
1484                                 list_for_each_entry_safe(ltd, next, head,
1485                                                          ltd_layout_list) {
1486                                         list_del_init(&ltd->ltd_layout_phase_list);
1487                                         list_del_init(&ltd->ltd_layout_list);
1488                                 }
1489                                 spin_unlock(&ltds->ltd_lock);
1490
1491                                 lr->lr_flags |= LEF_TO_OST;
1492                                 head = &llmd->llmd_ost_list;
1493                                 ltds = &lfsck->li_ost_descs;
1494                         } else {
1495                                 lr->lr_flags &= ~LEF_TO_OST;
1496                         }
1497                 } else {
1498                         lr->lr_flags |= LEF_TO_OST;
1499                         head = &llmd->llmd_ost_list;
1500                         ltds = &lfsck->li_ost_descs;
1501                 }
1502
1503 again:
1504                 laia->laia_ltds = ltds;
1505                 spin_lock(&ltds->ltd_lock);
1506                 while (!list_empty(head)) {
1507                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1508                                          ltd_layout_list);
1509                         if (!list_empty(&ltd->ltd_layout_phase_list))
1510                                 list_del_init(&ltd->ltd_layout_phase_list);
1511                         list_del_init(&ltd->ltd_layout_list);
1512                         atomic_inc(&ltd->ltd_ref);
1513                         laia->laia_ltd = ltd;
1514                         spin_unlock(&ltds->ltd_lock);
1515                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1516                                         lfsck_layout_master_async_interpret,
1517                                         laia, LFSCK_NOTIFY);
1518                         if (rc != 0) {
1519                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1520                                        "notify %s %x for stop/phase2_done/"
1521                                        "peer_exit: rc = %d\n",
1522                                        lfsck_lfsck2name(lfsck),
1523                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1524                                        "MDT", ltd->ltd_index, rc);
1525                                 lfsck_tgt_put(ltd);
1526                         }
1527                         spin_lock(&ltds->ltd_lock);
1528                 }
1529                 spin_unlock(&ltds->ltd_lock);
1530
1531                 rc = ptlrpc_set_wait(set);
1532                 if (rc < 0) {
1533                         ptlrpc_set_destroy(set);
1534                         RETURN(rc);
1535                 }
1536
1537                 if (!(lr->lr_flags & LEF_TO_OST)) {
1538                         lr->lr_flags |= LEF_TO_OST;
1539                         head = &llmd->llmd_ost_list;
1540                         ltds = &lfsck->li_ost_descs;
1541                         goto again;
1542                 }
1543                 break;
1544         }
1545         case LE_PHASE1_DONE:
1546                 llmd->llmd_touch_gen++;
1547                 ltds = &lfsck->li_mdt_descs;
1548                 laia->laia_ltds = ltds;
1549                 spin_lock(&ltds->ltd_lock);
1550                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1551                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1552                                          struct lfsck_tgt_desc,
1553                                          ltd_layout_phase_list);
1554                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1555                                 break;
1556
1557                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1558                         list_move_tail(&ltd->ltd_layout_phase_list,
1559                                        &llmd->llmd_mdt_phase1_list);
1560                         atomic_inc(&ltd->ltd_ref);
1561                         laia->laia_ltd = ltd;
1562                         spin_unlock(&ltds->ltd_lock);
1563                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1564                                         lfsck_layout_master_async_interpret,
1565                                         laia, LFSCK_NOTIFY);
1566                         if (rc != 0) {
1567                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1568                                        "notify MDT %x for phase1_done: "
1569                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1570                                        ltd->ltd_index, rc);
1571                                 lfsck_tgt_put(ltd);
1572                         }
1573                         spin_lock(&ltds->ltd_lock);
1574                 }
1575                 spin_unlock(&ltds->ltd_lock);
1576                 break;
1577         default:
1578                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1579                        lfsck_lfsck2name(lfsck), lr->lr_event);
1580                 rc = -EINVAL;
1581                 break;
1582         }
1583
1584         rc = ptlrpc_set_wait(set);
1585         ptlrpc_set_destroy(set);
1586
1587         RETURN(rc);
1588 }
1589
1590 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1591                                            struct lfsck_component *com,
1592                                            int rc)
1593 {
1594         struct lfsck_instance   *lfsck = com->lc_lfsck;
1595         struct lfsck_layout     *lo    = com->lc_file_ram;
1596         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1597
1598         down_write(&com->lc_sem);
1599         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1600                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1601         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1602         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1603
1604         if (rc > 0) {
1605                 com->lc_journal = 0;
1606                 if (lo->ll_flags & LF_INCOMPLETE)
1607                         lo->ll_status = LS_PARTIAL;
1608                 else
1609                         lo->ll_status = LS_COMPLETED;
1610                 if (!(bk->lb_param & LPF_DRYRUN))
1611                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1612                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1613                 lo->ll_success_count++;
1614         } else if (rc == 0) {
1615                 lo->ll_status = lfsck->li_status;
1616                 if (lo->ll_status == 0)
1617                         lo->ll_status = LS_STOPPED;
1618         } else {
1619                 lo->ll_status = LS_FAILED;
1620         }
1621
1622         rc = lfsck_layout_store(env, com);
1623         up_write(&com->lc_sem);
1624
1625         return rc;
1626 }
1627
1628 static int lfsck_layout_lock(const struct lu_env *env,
1629                              struct lfsck_component *com,
1630                              struct dt_object *obj,
1631                              struct lustre_handle *lh, __u64 bits)
1632 {
1633         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1634         ldlm_policy_data_t              *policy = &info->lti_policy;
1635         struct ldlm_res_id              *resid  = &info->lti_resid;
1636         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1637         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1638         int                              rc;
1639
1640         LASSERT(lfsck->li_namespace != NULL);
1641
1642         memset(policy, 0, sizeof(*policy));
1643         policy->l_inodebits.bits = bits;
1644         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1645         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1646                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1647                                     ldlm_completion_ast, NULL, NULL, 0,
1648                                     LVB_T_NONE, NULL, lh);
1649         if (rc == ELDLM_OK) {
1650                 rc = 0;
1651         } else {
1652                 memset(lh, 0, sizeof(*lh));
1653                 rc = -EIO;
1654         }
1655
1656         return rc;
1657 }
1658
1659 static void lfsck_layout_unlock(struct lustre_handle *lh)
1660 {
1661         if (lustre_handle_is_used(lh)) {
1662                 ldlm_lock_decref(lh, LCK_EX);
1663                 memset(lh, 0, sizeof(*lh));
1664         }
1665 }
1666
1667 static int lfsck_layout_trans_stop(const struct lu_env *env,
1668                                    struct dt_device *dev,
1669                                    struct thandle *handle, int result)
1670 {
1671         int rc;
1672
1673         handle->th_result = result;
1674         rc = dt_trans_stop(env, dev, handle);
1675         if (rc > 0)
1676                 rc = 0;
1677         else if (rc == 0)
1678                 rc = 1;
1679
1680         return rc;
1681 }
1682
1683 /**
1684  * Get the system default stripe size.
1685  *
1686  * \param[in] env       pointer to the thread context
1687  * \param[in] lfsck     pointer to the lfsck instance
1688  * \param[out] size     pointer to the default stripe size
1689  *
1690  * \retval              0 for success
1691  * \retval              negative error number on failure
1692  */
1693 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1694                                            struct lfsck_instance *lfsck,
1695                                            __u32 *size)
1696 {
1697         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1698         struct dt_object        *root;
1699         int                      rc;
1700
1701         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1702         if (IS_ERR(root))
1703                 return PTR_ERR(root);
1704
1705         /* Get the default stripe size via xattr_get on the backend root. */
1706         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1707                           XATTR_NAME_LOV, BYPASS_CAPA);
1708         if (rc > 0) {
1709                 /* The lum->lmm_stripe_size is LE mode. The *size also
1710                  * should be LE mode. So it is unnecessary to convert. */
1711                 *size = lum->lmm_stripe_size;
1712                 rc = 0;
1713         } else if (unlikely(rc == 0)) {
1714                 rc = -EINVAL;
1715         }
1716
1717         lfsck_object_put(env, root);
1718
1719         return rc;
1720 }
1721
1722 /**
1723  * \retval       +1: repaired
1724  * \retval        0: did nothing
1725  * \retval      -ve: on error
1726  */
1727 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1728                                      struct thandle *handle,
1729                                      struct dt_object *parent,
1730                                      struct lu_fid *cfid,
1731                                      struct lu_buf *buf,
1732                                      struct lov_ost_data_v1 *slot,
1733                                      int fl, __u32 ost_idx)
1734 {
1735         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1736         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1737         struct lu_buf            ea_buf;
1738         int                      rc;
1739         __u32                    magic;
1740         __u16                    count;
1741
1742         magic = le32_to_cpu(lmm->lmm_magic);
1743         count = le16_to_cpu(lmm->lmm_stripe_count);
1744
1745         fid_to_ostid(cfid, oi);
1746         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1747         slot->l_ost_gen = cpu_to_le32(0);
1748         slot->l_ost_idx = cpu_to_le32(ost_idx);
1749
1750         if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1751                 struct lov_ost_data_v1 *objs;
1752                 int                     i;
1753
1754                 if (magic == LOV_MAGIC_V1)
1755                         objs = &lmm->lmm_objects[0];
1756                 else
1757                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1758                 for (i = 0; i < count; i++, objs++) {
1759                         if (objs != slot && lovea_slot_is_dummy(objs))
1760                                 break;
1761                 }
1762
1763                 /* If the @slot is the last dummy slot to be refilled,
1764                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1765                 if (i == count)
1766                         lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1767         }
1768
1769         lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1770         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1771                           BYPASS_CAPA);
1772         if (rc == 0)
1773                 rc = 1;
1774
1775         return rc;
1776 }
1777
1778 /**
1779  * \retval       +1: repaired
1780  * \retval        0: did nothing
1781  * \retval      -ve: on error
1782  */
1783 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1784                                      struct lfsck_instance *lfsck,
1785                                      struct thandle *handle,
1786                                      struct dt_object *parent,
1787                                      struct lu_fid *cfid,
1788                                      struct lu_buf *buf, int fl,
1789                                      __u32 ost_idx, __u32 ea_off, bool reset)
1790 {
1791         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1792         struct lov_ost_data_v1  *objs;
1793         int                      rc;
1794         __u16                    count;
1795         bool                     hole   = false;
1796         ENTRY;
1797
1798         if (fl == LU_XATTR_CREATE || reset) {
1799                 __u32 pattern = LOV_PATTERN_RAID0;
1800
1801                 count = ea_off + 1;
1802                 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1803
1804                 if (ea_off != 0 || reset) {
1805                         pattern |= LOV_PATTERN_F_HOLE;
1806                         hole = true;
1807                 }
1808
1809                 memset(lmm, 0, buf->lb_len);
1810                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1811                 lmm->lmm_pattern = cpu_to_le32(pattern);
1812                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1813                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1814
1815                 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1816                                                      &lmm->lmm_stripe_size);
1817                 if (rc != 0)
1818                         RETURN(rc);
1819
1820                 objs = &lmm->lmm_objects[ea_off];
1821         } else {
1822                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1823                 int     gap;
1824
1825                 count = le16_to_cpu(lmm->lmm_stripe_count);
1826                 if (magic == LOV_MAGIC_V1)
1827                         objs = &lmm->lmm_objects[count];
1828                 else
1829                         objs = &((struct lov_mds_md_v3 *)lmm)->
1830                                                         lmm_objects[count];
1831
1832                 gap = ea_off - count;
1833                 if (gap >= 0)
1834                         count = ea_off + 1;
1835                 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1836
1837                 if (gap > 0) {
1838                         memset(objs, 0, gap * sizeof(*objs));
1839                         lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1840                         hole = true;
1841                 }
1842
1843                 lmm->lmm_layout_gen =
1844                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1845                 objs += gap;
1846         }
1847
1848         lmm->lmm_stripe_count = cpu_to_le16(count);
1849         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1850                                        fl, ost_idx);
1851
1852         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1853                DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1854                "reset %s, %s LOV EA hole: rc = %d\n",
1855                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1856                ost_idx, ea_off, fl, reset ? "yes" : "no",
1857                hole ? "with" : "without", rc);
1858
1859         RETURN(rc);
1860 }
1861
1862 /**
1863  * \retval       +1: repaired
1864  * \retval        0: did nothing
1865  * \retval      -ve: on error
1866  */
1867 static int lfsck_layout_update_pfid(const struct lu_env *env,
1868                                     struct lfsck_component *com,
1869                                     struct dt_object *parent,
1870                                     struct lu_fid *cfid,
1871                                     struct dt_device *cdev, __u32 ea_off)
1872 {
1873         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1874         struct dt_object        *child;
1875         struct thandle          *handle;
1876         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1877         struct lu_buf           *buf;
1878         int                      rc     = 0;
1879         ENTRY;
1880
1881         child = lfsck_object_find_by_dev(env, cdev, cfid);
1882         if (IS_ERR(child))
1883                 RETURN(PTR_ERR(child));
1884
1885         handle = dt_trans_create(env, cdev);
1886         if (IS_ERR(handle))
1887                 GOTO(out, rc = PTR_ERR(handle));
1888
1889         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1890         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1891         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1892          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1893          * parent MDT-object's layout EA. */
1894         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1895         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1896
1897         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1898         if (rc != 0)
1899                 GOTO(stop, rc);
1900
1901         rc = dt_trans_start(env, cdev, handle);
1902         if (rc != 0)
1903                 GOTO(stop, rc);
1904
1905         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1906                           BYPASS_CAPA);
1907
1908         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1909
1910 stop:
1911         dt_trans_stop(env, cdev, handle);
1912
1913 out:
1914         lu_object_put(env, &child->do_lu);
1915
1916         return rc;
1917 }
1918
1919 /**
1920  * This function will create the MDT-object with the given (partial) LOV EA.
1921  *
1922  * Under some data corruption cases, the MDT-object of the file may be lost,
1923  * but its OST-objects, or some of them are there. The layout LFSCK needs to
1924  * re-create the MDT-object with the orphan OST-object(s) information.
1925  *
1926  * On the other hand, the LFSCK may has created some OST-object for repairing
1927  * dangling LOV EA reference, but as the LFSCK processing, it may find that
1928  * the old OST-object is there and should replace the former new created OST
1929  * object. Unfortunately, some others have modified such newly created object.
1930  * To keep the data (both new and old), the LFSCK will create MDT-object with
1931  * new FID to reference the original OST-object.
1932  *
1933  * \param[in] env       pointer to the thread context
1934  * \param[in] com       pointer to the lfsck component
1935  * \param[in] ltd       pointer to target device descriptor
1936  * \param[in] rec       pointer to the record for the orphan OST-object
1937  * \param[in] cfid      pointer to FID for the orphan OST-object
1938  * \param[in] infix     additional information, such as the FID for original
1939  *                      MDT-object and the stripe offset in the LOV EA
1940  * \param[in] type      the type for describing why the orphan MDT-object is
1941  *                      created. The rules are as following:
1942  *
1943  *  type "C":           Multiple OST-objects claim the same MDT-object and the
1944  *                      same slot in the layout EA. Then the LFSCK will create
1945  *                      new MDT-object(s) to hold the conflict OST-object(s).
1946  *
1947  *  type "N":           The orphan OST-object does not know which one was the
1948  *                      real parent MDT-object, so the LFSCK uses new FID for
1949  *                      its parent MDT-object.
1950  *
1951  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
1952  *                      but does not know the position (the file name) in the
1953  *                      namespace.
1954  *
1955  * The orphan name will be like:
1956  * ${FID}-${infix}-${type}-${conflict_version}
1957  *
1958  * \param[in] ea_off    the stripe offset in the LOV EA
1959  *
1960  * \retval              positive on repaired something
1961  * \retval              0 if needs to repair nothing
1962  * \retval              negative error number on failure
1963  */
1964 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1965                                         struct lfsck_component *com,
1966                                         struct lfsck_tgt_desc *ltd,
1967                                         struct lu_orphan_rec *rec,
1968                                         struct lu_fid *cfid,
1969                                         const char *infix,
1970                                         const char *type,
1971                                         __u32 ea_off)
1972 {
1973         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1974         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
1975         char                            *name   = info->lti_key;
1976         struct lu_attr                  *la     = &info->lti_la;
1977         struct dt_object_format         *dof    = &info->lti_dof;
1978         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1979         struct lu_fid                   *pfid   = &rec->lor_fid;
1980         struct lu_fid                   *tfid   = &info->lti_fid3;
1981         struct dt_device                *next   = lfsck->li_next;
1982         struct dt_object                *pobj   = NULL;
1983         struct dt_object                *cobj   = NULL;
1984         struct thandle                  *th     = NULL;
1985         struct lu_buf                    pbuf   = { 0 };
1986         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1987         struct lu_buf                    lov_buf;
1988         struct lustre_handle             lh     = { 0 };
1989         struct linkea_data               ldata  = { 0 };
1990         struct lu_buf                    linkea_buf;
1991         const struct lu_name            *pname;
1992         int                              size   = 0;
1993         int                              idx    = 0;
1994         int                              rc     = 0;
1995         ENTRY;
1996
1997         /* Create .lustre/lost+found/MDTxxxx when needed. */
1998         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1999                 rc = lfsck_create_lpf(env, lfsck);
2000                 if (rc != 0)
2001                         GOTO(log, rc);
2002         }
2003
2004         if (fid_is_zero(pfid)) {
2005                 struct filter_fid *ff = &info->lti_new_pfid;
2006
2007                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
2008                 if (rc != 0)
2009                         RETURN(rc);
2010
2011                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
2012                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
2013                 /* Currently, the filter_fid::ff_parent::f_ver is not the
2014                  * real parent MDT-object's FID::f_ver, instead it is the
2015                  * OST-object index in its parent MDT-object's layout EA. */
2016                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
2017                 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
2018                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
2019                 if (IS_ERR(cobj))
2020                         GOTO(log, rc = PTR_ERR(cobj));
2021         }
2022
2023         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
2024         if (IS_ERR(pobj))
2025                 GOTO(put, rc = PTR_ERR(pobj));
2026
2027         LASSERT(infix != NULL);
2028         LASSERT(type != NULL);
2029
2030         do {
2031                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
2032                          type, idx++);
2033                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2034                                (const struct dt_key *)name, BYPASS_CAPA);
2035                 if (rc != 0 && rc != -ENOENT)
2036                         GOTO(put, rc);
2037         } while (rc == 0);
2038
2039         rc = linkea_data_new(&ldata,
2040                              &lfsck_env_info(env)->lti_linkea_buf);
2041         if (rc != 0)
2042                 GOTO(put, rc);
2043
2044         pname = lfsck_name_get_const(env, name, strlen(name));
2045         rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
2046         if (rc != 0)
2047                 GOTO(put, rc);
2048
2049         memset(la, 0, sizeof(*la));
2050         la->la_uid = rec->lor_uid;
2051         la->la_gid = rec->lor_gid;
2052         la->la_mode = S_IFREG | S_IRUSR;
2053         la->la_valid = LA_MODE | LA_UID | LA_GID;
2054
2055         memset(dof, 0, sizeof(*dof));
2056         dof->dof_type = dt_mode_to_dft(S_IFREG);
2057
2058         size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2059         if (ea_buf->lb_len < size) {
2060                 lu_buf_realloc(ea_buf, size);
2061                 if (ea_buf->lb_buf == NULL)
2062                         GOTO(put, rc = -ENOMEM);
2063         }
2064
2065         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
2066          *
2067          * XXX: Currently, we do not grab the PDO lock as normal create cases,
2068          *      because creating MDT-object for orphan OST-object is rare, we
2069          *      do not much care about the performance. It can be improved in
2070          *      the future when needed. */
2071         rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
2072                                MDS_INODELOCK_UPDATE);
2073         if (rc != 0)
2074                 GOTO(put, rc);
2075
2076         th = dt_trans_create(env, next);
2077         if (IS_ERR(th))
2078                 GOTO(unlock, rc = PTR_ERR(th));
2079
2080         /* 1a. Update OST-object's parent information remotely.
2081          *
2082          * If other subsequent modifications failed, then next LFSCK scanning
2083          * will process the OST-object as orphan again with known parent FID. */
2084         if (cobj != NULL) {
2085                 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
2086                                           0, th);
2087                 if (rc != 0)
2088                         GOTO(stop, rc);
2089         }
2090
2091         /* 2a. Create the MDT-object locally. */
2092         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2093         if (rc != 0)
2094                 GOTO(stop, rc);
2095
2096         /* 3a. Add layout EA for the MDT-object. */
2097         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
2098         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
2099                                   LU_XATTR_CREATE, th);
2100         if (rc != 0)
2101                 GOTO(stop, rc);
2102
2103         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2104         dtrec->rec_fid = pfid;
2105         dtrec->rec_type = S_IFREG;
2106         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
2107                                (const struct dt_rec *)dtrec,
2108                                (const struct dt_key *)name, th);
2109         if (rc != 0)
2110                 GOTO(stop, rc);
2111
2112         /* 5a. insert linkEA for parent. */
2113         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2114                        ldata.ld_leh->leh_len);
2115         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
2116                                   XATTR_NAME_LINK, 0, th);
2117         if (rc != 0)
2118                 GOTO(stop, rc);
2119
2120         rc = dt_trans_start(env, next, th);
2121         if (rc != 0)
2122                 GOTO(stop, rc);
2123
2124         /* 1b. Update OST-object's parent information remotely. */
2125         if (cobj != NULL) {
2126                 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
2127                                   BYPASS_CAPA);
2128                 if (rc != 0)
2129                         GOTO(stop, rc);
2130         }
2131
2132         dt_write_lock(env, pobj, 0);
2133         /* 2b. Create the MDT-object locally. */
2134         rc = dt_create(env, pobj, la, NULL, dof, th);
2135         if (rc == 0)
2136                 /* 3b. Add layout EA for the MDT-object. */
2137                 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
2138                                                &lov_buf, LU_XATTR_CREATE,
2139                                                ltd->ltd_index, ea_off, false);
2140         dt_write_unlock(env, pobj);
2141         if (rc < 0)
2142                 GOTO(stop, rc);
2143
2144         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2145         rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
2146                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2147         if (rc != 0)
2148                 GOTO(stop, rc);
2149
2150         /* 5b. insert linkEA for parent. */
2151         rc = dt_xattr_set(env, pobj, &linkea_buf,
2152                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2153
2154         GOTO(stop, rc);
2155
2156 stop:
2157         dt_trans_stop(env, next, th);
2158
2159 unlock:
2160         lfsck_layout_unlock(&lh);
2161
2162 put:
2163         if (cobj != NULL && !IS_ERR(cobj))
2164                 lu_object_put(env, &cobj->do_lu);
2165         if (pobj != NULL && !IS_ERR(pobj))
2166                 lu_object_put(env, &pobj->do_lu);
2167
2168 log:
2169         if (rc < 0)
2170                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2171                        "recreate the lost MDT-object: parent "DFID
2172                        ", child "DFID", OST-index %u, stripe-index %u, "
2173                        "infix %s, type %s: rc = %d\n",
2174                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2175                        ltd->ltd_index, ea_off, infix, type, rc);
2176
2177         return rc >= 0 ? 1 : rc;
2178 }
2179
2180 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2181                                                    struct lfsck_component *com,
2182                                                    const struct lu_fid *fid,
2183                                                    __u32 index)
2184 {
2185         struct lfsck_thread_info *info  = lfsck_env_info(env);
2186         struct lfsck_request     *lr    = &info->lti_lr;
2187         struct lfsck_instance    *lfsck = com->lc_lfsck;
2188         struct lfsck_tgt_desc    *ltd;
2189         struct ptlrpc_request    *req;
2190         struct lfsck_request     *tmp;
2191         struct obd_export        *exp;
2192         int                       rc    = 0;
2193         ENTRY;
2194
2195         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2196         if (unlikely(ltd == NULL))
2197                 RETURN(-ENXIO);
2198
2199         exp = ltd->ltd_exp;
2200         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2201                 GOTO(put, rc = -EOPNOTSUPP);
2202
2203         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2204         if (req == NULL)
2205                 GOTO(put, rc = -ENOMEM);
2206
2207         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2208         if (rc != 0) {
2209                 ptlrpc_request_free(req);
2210
2211                 GOTO(put, rc);
2212         }
2213
2214         memset(lr, 0, sizeof(*lr));
2215         lr->lr_event = LE_CONDITIONAL_DESTROY;
2216         lr->lr_active = LFSCK_TYPE_LAYOUT;
2217         lr->lr_fid = *fid;
2218
2219         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2220         *tmp = *lr;
2221         ptlrpc_request_set_replen(req);
2222
2223         rc = ptlrpc_queue_wait(req);
2224         ptlrpc_req_finished(req);
2225
2226         GOTO(put, rc);
2227
2228 put:
2229         lfsck_tgt_put(ltd);
2230
2231         return rc;
2232 }
2233
2234 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2235                                                   struct lfsck_component *com,
2236                                                   struct lfsck_request *lr)
2237 {
2238         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2239         struct lu_attr                  *la     = &info->lti_la;
2240         ldlm_policy_data_t              *policy = &info->lti_policy;
2241         struct ldlm_res_id              *resid  = &info->lti_resid;
2242         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2243         struct dt_device                *dev    = lfsck->li_bottom;
2244         struct lu_fid                   *fid    = &lr->lr_fid;
2245         struct dt_object                *obj;
2246         struct thandle                  *th     = NULL;
2247         struct lustre_handle             lh     = { 0 };
2248         __u64                            flags  = 0;
2249         int                              rc     = 0;
2250         ENTRY;
2251
2252         obj = lfsck_object_find_by_dev(env, dev, fid);
2253         if (IS_ERR(obj))
2254                 RETURN(PTR_ERR(obj));
2255
2256         dt_read_lock(env, obj, 0);
2257         if (dt_object_exists(obj) == 0 ||
2258             lfsck_is_dead_obj(obj)) {
2259                 dt_read_unlock(env, obj);
2260
2261                 GOTO(put, rc = -ENOENT);
2262         }
2263
2264         /* Get obj's attr without lock firstly. */
2265         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2266         dt_read_unlock(env, obj);
2267         if (rc != 0)
2268                 GOTO(put, rc);
2269
2270         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2271                 GOTO(put, rc = -ETXTBSY);
2272
2273         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2274         LASSERT(lfsck->li_namespace != NULL);
2275
2276         memset(policy, 0, sizeof(*policy));
2277         policy->l_extent.end = OBD_OBJECT_EOF;
2278         ost_fid_build_resid(fid, resid);
2279         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2280                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2281                                     ldlm_completion_ast, NULL, NULL, 0,
2282                                     LVB_T_NONE, NULL, &lh);
2283         if (rc != ELDLM_OK)
2284                 GOTO(put, rc = -EIO);
2285
2286         dt_write_lock(env, obj, 0);
2287         /* Get obj's attr within lock again. */
2288         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2289         if (rc != 0)
2290                 GOTO(unlock, rc);
2291
2292         if (la->la_ctime != 0)
2293                 GOTO(unlock, rc = -ETXTBSY);
2294
2295         th = dt_trans_create(env, dev);
2296         if (IS_ERR(th))
2297                 GOTO(unlock, rc = PTR_ERR(th));
2298
2299         rc = dt_declare_ref_del(env, obj, th);
2300         if (rc != 0)
2301                 GOTO(stop, rc);
2302
2303         rc = dt_declare_destroy(env, obj, th);
2304         if (rc != 0)
2305                 GOTO(stop, rc);
2306
2307         rc = dt_trans_start_local(env, dev, th);
2308         if (rc != 0)
2309                 GOTO(stop, rc);
2310
2311         rc = dt_ref_del(env, obj, th);
2312         if (rc != 0)
2313                 GOTO(stop, rc);
2314
2315         rc = dt_destroy(env, obj, th);
2316         if (rc == 0)
2317                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2318                        "OST-object "DFID" that was created for reparing "
2319                        "dangling referenced case. But the original missed "
2320                        "OST-object is found now.\n",
2321                        lfsck_lfsck2name(lfsck), PFID(fid));
2322
2323         GOTO(stop, rc);
2324
2325 stop:
2326         dt_trans_stop(env, dev, th);
2327
2328 unlock:
2329         dt_write_unlock(env, obj);
2330         ldlm_lock_decref(&lh, LCK_EX);
2331
2332 put:
2333         lu_object_put(env, &obj->do_lu);
2334
2335         return rc;
2336 }
2337
2338 /**
2339  * Some OST-object has occupied the specified layout EA slot.
2340  * Such OST-object may be generated by the LFSCK when repair
2341  * dangling referenced MDT-object, which can be indicated by
2342  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2343  * is true and such OST-object has not been modified yet, we
2344  * will replace it with the orphan OST-object; otherwise the
2345  * LFSCK will create new MDT-object to reference the orphan.
2346  *
2347  * \retval       +1: repaired
2348  * \retval        0: did nothing
2349  * \retval      -ve: on error
2350  */
2351 static int lfsck_layout_conflict_create(const struct lu_env *env,
2352                                         struct lfsck_component *com,
2353                                         struct lfsck_tgt_desc *ltd,
2354                                         struct lu_orphan_rec *rec,
2355                                         struct dt_object *parent,
2356                                         struct lu_fid *cfid,
2357                                         struct lu_buf *ea_buf,
2358                                         struct lov_ost_data_v1 *slot,
2359                                         __u32 ea_off)
2360 {
2361         struct lfsck_thread_info *info          = lfsck_env_info(env);
2362         struct lu_fid            *cfid2         = &info->lti_fid2;
2363         struct ost_id            *oi            = &info->lti_oi;
2364         char                     *infix         = info->lti_tmpbuf;
2365         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2366         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2367         struct thandle           *th            = NULL;
2368         struct lustre_handle      lh            = { 0 };
2369         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2370         int                       rc            = 0;
2371         ENTRY;
2372
2373         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2374         rc = ostid_to_fid(cfid2, oi, ost_idx2);
2375         if (rc != 0)
2376                 GOTO(out, rc);
2377
2378         /* Hold layout lock on the parent to prevent others to access. */
2379         rc = lfsck_layout_lock(env, com, parent, &lh,
2380                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2381         if (rc != 0)
2382                 GOTO(out, rc);
2383
2384         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2385
2386         /* If the conflict OST-obejct is not created for fixing dangling
2387          * referenced MDT-object in former LFSCK check/repair, or it has
2388          * been modified by others, then we cannot destroy it. Re-create
2389          * a new MDT-object for the orphan OST-object. */
2390         if (rc == -ETXTBSY) {
2391                 /* No need the layout lock on the original parent. */
2392                 lfsck_layout_unlock(&lh);
2393
2394                 fid_zero(&rec->lor_fid);
2395                 snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2396                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2397                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2398                                                   infix, "C", ea_off);
2399
2400                 RETURN(rc);
2401         }
2402
2403         if (rc != 0 && rc != -ENOENT)
2404                 GOTO(unlock, rc);
2405
2406         th = dt_trans_create(env, dev);
2407         if (IS_ERR(th))
2408                 GOTO(unlock, rc = PTR_ERR(th));
2409
2410         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2411                                   LU_XATTR_REPLACE, th);
2412         if (rc != 0)
2413                 GOTO(stop, rc);
2414
2415         rc = dt_trans_start_local(env, dev, th);
2416         if (rc != 0)
2417                 GOTO(stop, rc);
2418
2419         dt_write_lock(env, parent, 0);
2420         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2421         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2422                                        LU_XATTR_REPLACE, ltd->ltd_index);
2423         dt_write_unlock(env, parent);
2424
2425         GOTO(stop, rc);
2426
2427 stop:
2428         dt_trans_stop(env, dev, th);
2429
2430 unlock:
2431         lfsck_layout_unlock(&lh);
2432
2433 out:
2434         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2435                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2436                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2437                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2438                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2439                ea_off, rc);
2440
2441         return rc >= 0 ? 1 : rc;
2442 }
2443
2444 /**
2445  * \retval       +1: repaired
2446  * \retval        0: did nothing
2447  * \retval      -ve: on error
2448  */
2449 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2450                                        struct lfsck_component *com,
2451                                        struct lfsck_tgt_desc *ltd,
2452                                        struct lu_orphan_rec *rec,
2453                                        struct dt_object *parent,
2454                                        struct lu_fid *cfid,
2455                                        __u32 ost_idx, __u32 ea_off)
2456 {
2457         struct lfsck_thread_info *info          = lfsck_env_info(env);
2458         struct lu_buf            *buf           = &info->lti_big_buf;
2459         struct lu_fid            *fid           = &info->lti_fid2;
2460         struct ost_id            *oi            = &info->lti_oi;
2461         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2462         struct dt_device         *dt            = lfsck->li_bottom;
2463         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2464         struct thandle            *handle       = NULL;
2465         size_t                    lovea_size;
2466         struct lov_mds_md_v1     *lmm;
2467         struct lov_ost_data_v1   *objs;
2468         struct lustre_handle      lh            = { 0 };
2469         __u32                     magic;
2470         int                       fl            = 0;
2471         int                       rc            = 0;
2472         int                       rc1;
2473         int                       i;
2474         __u16                     count;
2475         bool                      locked        = false;
2476         ENTRY;
2477
2478         rc = lfsck_layout_lock(env, com, parent, &lh,
2479                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2480         if (rc != 0) {
2481                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2482                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2483                        "stripe-index %u: rc = %d\n",
2484                        lfsck_lfsck2name(lfsck), PFID(cfid),
2485                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2486
2487                 RETURN(rc);
2488         }
2489
2490 again:
2491         if (locked) {
2492                 dt_write_unlock(env, parent);
2493                 locked = false;
2494         }
2495
2496         if (handle != NULL) {
2497                 dt_trans_stop(env, dt, handle);
2498                 handle = NULL;
2499         }
2500
2501         if (rc < 0)
2502                 GOTO(unlock_layout, rc);
2503
2504         lovea_size = rc;
2505         if (buf->lb_len < lovea_size) {
2506                 lu_buf_realloc(buf, lovea_size);
2507                 if (buf->lb_buf == NULL)
2508                         GOTO(unlock_layout, rc = -ENOMEM);
2509         }
2510
2511         if (!(bk->lb_param & LPF_DRYRUN)) {
2512                 handle = dt_trans_create(env, dt);
2513                 if (IS_ERR(handle))
2514                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2515
2516                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2517                                           fl, handle);
2518                 if (rc != 0)
2519                         GOTO(stop, rc);
2520
2521                 rc = dt_trans_start_local(env, dt, handle);
2522                 if (rc != 0)
2523                         GOTO(stop, rc);
2524         }
2525
2526         dt_write_lock(env, parent, 0);
2527         locked = true;
2528         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2529         if (rc == -ERANGE) {
2530                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2531                                   BYPASS_CAPA);
2532                 LASSERT(rc != 0);
2533                 goto again;
2534         } else if (rc == -ENODATA || rc == 0) {
2535                 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2536                 /* If the declared is not big enough, re-try. */
2537                 if (buf->lb_len < lovea_size) {
2538                         rc = lovea_size;
2539                         goto again;
2540                 }
2541                 fl = LU_XATTR_CREATE;
2542         } else if (rc < 0) {
2543                 GOTO(unlock_parent, rc);
2544         } else if (unlikely(buf->lb_len == 0)) {
2545                 goto again;
2546         } else {
2547                 fl = LU_XATTR_REPLACE;
2548                 lovea_size = rc;
2549         }
2550
2551         if (fl == LU_XATTR_CREATE) {
2552                 if (bk->lb_param & LPF_DRYRUN)
2553                         GOTO(unlock_parent, rc = 1);
2554
2555                 LASSERT(buf->lb_len >= lovea_size);
2556
2557                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2558                                                buf, fl, ost_idx, ea_off, false);
2559
2560                 GOTO(unlock_parent, rc);
2561         }
2562
2563         lmm = buf->lb_buf;
2564         rc1 = lfsck_layout_verify_header(lmm);
2565
2566         /* If the LOV EA crashed, the rebuild it. */
2567         if (rc1 == -EINVAL) {
2568                 if (bk->lb_param & LPF_DRYRUN)
2569                         GOTO(unlock_parent, rc = 1);
2570
2571                 LASSERT(buf->lb_len >= lovea_size);
2572
2573                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2574                                                buf, fl, ost_idx, ea_off, true);
2575
2576                 GOTO(unlock_parent, rc);
2577         }
2578
2579         /* For other unknown magic/pattern, keep the current LOV EA. */
2580         if (rc1 != 0)
2581                 GOTO(unlock_parent, rc = rc1);
2582
2583         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2584          * been verified in lfsck_layout_verify_header() already. If some
2585          * new magic introduced in the future, then layout LFSCK needs to
2586          * be updated also. */
2587         magic = le32_to_cpu(lmm->lmm_magic);
2588         if (magic == LOV_MAGIC_V1) {
2589                 objs = &lmm->lmm_objects[0];
2590         } else {
2591                 LASSERT(magic == LOV_MAGIC_V3);
2592                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2593         }
2594
2595         count = le16_to_cpu(lmm->lmm_stripe_count);
2596         if (count == 0)
2597                 GOTO(unlock_parent, rc = -EINVAL);
2598         LASSERT(count > 0);
2599
2600         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2601         if (count <= ea_off) {
2602                 if (bk->lb_param & LPF_DRYRUN)
2603                         GOTO(unlock_parent, rc = 1);
2604
2605                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2606                 /* If the declared is not big enough, re-try. */
2607                 if (buf->lb_len < lovea_size) {
2608                         rc = lovea_size;
2609                         goto again;
2610                 }
2611
2612                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2613                                                buf, fl, ost_idx, ea_off, false);
2614
2615                 GOTO(unlock_parent, rc);
2616         }
2617
2618         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2619
2620         for (i = 0; i < count; i++, objs++) {
2621                 /* The MDT-object was created via lfsck_layout_recover_create()
2622                  * by others before, and we fill the dummy layout EA. */
2623                 if (lovea_slot_is_dummy(objs)) {
2624                         if (i != ea_off)
2625                                 continue;
2626
2627                         if (bk->lb_param & LPF_DRYRUN)
2628                                 GOTO(unlock_parent, rc = 1);
2629
2630                         lmm->lmm_layout_gen =
2631                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2632                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2633                                                        cfid, buf, objs, fl,
2634                                                        ost_idx);
2635
2636                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2637                                "dummy layout slot for "DFID": parent "DFID
2638                                ", OST-index %u, stripe-index %u: rc = %d\n",
2639                                lfsck_lfsck2name(lfsck), PFID(cfid),
2640                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2641
2642                         GOTO(unlock_parent, rc);
2643                 }
2644
2645                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2646                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2647                 if (rc != 0) {
2648                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2649                                "invalid layout EA at the slot %d, index %u\n",
2650                                lfsck_lfsck2name(lfsck),
2651                                PFID(lfsck_dto2fid(parent)), i,
2652                                le32_to_cpu(objs->l_ost_idx));
2653
2654                         GOTO(unlock_parent, rc);
2655                 }
2656
2657                 /* It should be rare case, the slot is there, but the LFSCK
2658                  * does not handle it during the first-phase cycle scanning. */
2659                 if (unlikely(lu_fid_eq(fid, cfid))) {
2660                         if (i == ea_off) {
2661                                 GOTO(unlock_parent, rc = 0);
2662                         } else {
2663                                 /* Rare case that the OST-object index
2664                                  * does not match the parent MDT-object
2665                                  * layout EA. We trust the later one. */
2666                                 if (bk->lb_param & LPF_DRYRUN)
2667                                         GOTO(unlock_parent, rc = 1);
2668
2669                                 dt_write_unlock(env, parent);
2670                                 if (handle != NULL)
2671                                         dt_trans_stop(env, dt, handle);
2672                                 lfsck_layout_unlock(&lh);
2673                                 rc = lfsck_layout_update_pfid(env, com, parent,
2674                                                         cfid, ltd->ltd_tgt, i);
2675
2676                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2677                                        "updated OST-object's pfid for "DFID
2678                                        ": parent "DFID", OST-index %u, "
2679                                        "stripe-index %u: rc = %d\n",
2680                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2681                                        PFID(lfsck_dto2fid(parent)),
2682                                        ltd->ltd_index, i, rc);
2683
2684                                 RETURN(rc);
2685                         }
2686                 }
2687         }
2688
2689         /* The MDT-object exists, but related layout EA slot is occupied
2690          * by others. */
2691         if (bk->lb_param & LPF_DRYRUN)
2692                 GOTO(unlock_parent, rc = 1);
2693
2694         dt_write_unlock(env, parent);
2695         if (handle != NULL)
2696                 dt_trans_stop(env, dt, handle);
2697         lfsck_layout_unlock(&lh);
2698         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2699                 objs = &lmm->lmm_objects[ea_off];
2700         else
2701                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2702         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2703                                           buf, objs, ea_off);
2704
2705         RETURN(rc);
2706
2707 unlock_parent:
2708         if (locked)
2709                 dt_write_unlock(env, parent);
2710
2711 stop:
2712         if (handle != NULL)
2713                 dt_trans_stop(env, dt, handle);
2714
2715 unlock_layout:
2716         lfsck_layout_unlock(&lh);
2717
2718         return rc;
2719 }
2720
2721 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2722                                         struct lfsck_component *com,
2723                                         struct lfsck_tgt_desc *ltd,
2724                                         struct lu_orphan_rec *rec,
2725                                         struct lu_fid *cfid)
2726 {
2727         struct lfsck_layout     *lo     = com->lc_file_ram;
2728         struct lu_fid           *pfid   = &rec->lor_fid;
2729         struct dt_object        *parent = NULL;
2730         __u32                    ea_off = pfid->f_stripe_idx;
2731         int                      rc     = 0;
2732         ENTRY;
2733
2734         if (!fid_is_sane(cfid))
2735                 GOTO(out, rc = -EINVAL);
2736
2737         if (fid_is_zero(pfid)) {
2738                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2739                                                   "", "N", ea_off);
2740                 GOTO(out, rc);
2741         }
2742
2743         pfid->f_ver = 0;
2744         if (!fid_is_sane(pfid))
2745                 GOTO(out, rc = -EINVAL);
2746
2747         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2748         if (IS_ERR(parent))
2749                 GOTO(out, rc = PTR_ERR(parent));
2750
2751         if (unlikely(dt_object_remote(parent) != 0))
2752                 GOTO(put, rc = -EXDEV);
2753
2754         if (dt_object_exists(parent) == 0) {
2755                 lu_object_put(env, &parent->do_lu);
2756                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2757                                                   "", "R", ea_off);
2758                 GOTO(out, rc);
2759         }
2760
2761         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2762                 GOTO(put, rc = -EISDIR);
2763
2764         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2765                                          ltd->ltd_index, ea_off);
2766
2767         GOTO(put, rc);
2768
2769 put:
2770         if (rc <= 0)
2771                 lu_object_put(env, &parent->do_lu);
2772         else
2773                 /* The layout EA is changed, need to be reloaded next time. */
2774                 lu_object_put_nocache(env, &parent->do_lu);
2775
2776 out:
2777         down_write(&com->lc_sem);
2778         com->lc_new_scanned++;
2779         com->lc_new_checked++;
2780         if (rc > 0) {
2781                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2782                 rc = 0;
2783         } else if (rc < 0) {
2784                 lo->ll_objs_failed_phase2++;
2785         }
2786         up_write(&com->lc_sem);
2787
2788         return rc;
2789 }
2790
2791 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2792                                     struct lfsck_component *com,
2793                                     struct lfsck_tgt_desc *ltd)
2794 {
2795         struct lfsck_layout             *lo     = com->lc_file_ram;
2796         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2797         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2798         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2799         struct ost_id                   *oi     = &info->lti_oi;
2800         struct lu_fid                   *fid    = &info->lti_fid;
2801         struct dt_object                *obj;
2802         const struct dt_it_ops          *iops;
2803         struct dt_it                    *di;
2804         int                              rc     = 0;
2805         ENTRY;
2806
2807         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2808                "scanning for OST%04x\n",
2809                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2810
2811         ostid_set_seq(oi, FID_SEQ_IDIF);
2812         ostid_set_id(oi, 0);
2813         rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2814         if (rc != 0)
2815                 GOTO(log, rc);
2816
2817         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2818         if (unlikely(IS_ERR(obj)))
2819                 GOTO(log, rc = PTR_ERR(obj));
2820
2821         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2822         if (rc != 0)
2823                 GOTO(put, rc);
2824
2825         iops = &obj->do_index_ops->dio_it;
2826         di = iops->init(env, obj, 0, BYPASS_CAPA);
2827         if (IS_ERR(di))
2828                 GOTO(put, rc = PTR_ERR(di));
2829
2830         rc = iops->load(env, di, 0);
2831         if (rc == -ESRCH) {
2832                 /* -ESRCH means that the orphan OST-objects rbtree has been
2833                  * cleanup because of the OSS server restart or other errors. */
2834                 lo->ll_flags |= LF_INCOMPLETE;
2835                 GOTO(fini, rc);
2836         }
2837
2838         if (rc == 0)
2839                 rc = iops->next(env, di);
2840         else if (rc > 0)
2841                 rc = 0;
2842
2843         if (rc < 0)
2844                 GOTO(fini, rc);
2845
2846         if (rc > 0)
2847                 GOTO(fini, rc = 0);
2848
2849         do {
2850                 struct dt_key           *key;
2851                 struct lu_orphan_rec    *rec = &info->lti_rec;
2852
2853                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2854                     cfs_fail_val > 0) {
2855                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2856                         struct l_wait_info       lwi;
2857
2858                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2859                                           NULL, NULL);
2860                         l_wait_event(thread->t_ctl_waitq,
2861                                      !thread_is_running(thread),
2862                                      &lwi);
2863                 }
2864
2865                 key = iops->key(env, di);
2866                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2867                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2868                 if (rc == 0)
2869                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2870                                         &com->lc_fid_latest_scanned_phase2);
2871                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2872                         GOTO(fini, rc);
2873
2874                 lfsck_control_speed_by_self(com);
2875                 do {
2876                         rc = iops->next(env, di);
2877                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2878         } while (rc == 0);
2879
2880         GOTO(fini, rc);
2881
2882 fini:
2883         iops->put(env, di);
2884         iops->fini(env, di);
2885 put:
2886         lu_object_put(env, &obj->do_lu);
2887
2888 log:
2889         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2890                "scanning for OST%04x: rc = %d\n",
2891                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2892
2893         return rc > 0 ? 0 : rc;
2894 }
2895
2896 /* For the MDT-object with dangling reference, we need to repare the
2897  * inconsistency according to the LFSCK sponsor's requirement:
2898  *
2899  * 1) Keep the inconsistency there and report the inconsistency case,
2900  *    then give the chance to the application to find related issues,
2901  *    and the users can make the decision about how to handle it with
2902  *    more human knownledge. (by default)
2903  *
2904  * 2) Re-create the missed OST-object with the FID/owner information. */
2905 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2906                                         struct lfsck_component *com,
2907                                         struct lfsck_layout_req *llr,
2908                                         const struct lu_attr *pla)
2909 {
2910         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2911         struct filter_fid               *pfid   = &info->lti_new_pfid;
2912         struct dt_allocation_hint       *hint   = &info->lti_hint;
2913         struct lu_attr                  *cla    = &info->lti_la2;
2914         struct dt_object                *parent = llr->llr_parent->llo_obj;
2915         struct dt_object                *child  = llr->llr_child;
2916         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2917         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2918         struct thandle                  *handle;
2919         struct lu_buf                   *buf;
2920         struct lustre_handle             lh     = { 0 };
2921         int                              rc;
2922         bool                             create;
2923         ENTRY;
2924
2925         if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2926                 create = true;
2927         else
2928                 create = false;
2929
2930         if (!create)
2931                 GOTO(log, rc = 1);
2932
2933         memset(cla, 0, sizeof(*cla));
2934         cla->la_uid = pla->la_uid;
2935         cla->la_gid = pla->la_gid;
2936         cla->la_mode = S_IFREG | 0666;
2937         cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2938                         LA_ATIME | LA_MTIME | LA_CTIME;
2939
2940         rc = lfsck_layout_lock(env, com, parent, &lh,
2941                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2942         if (rc != 0)
2943                 GOTO(log, rc);
2944
2945         handle = dt_trans_create(env, dev);
2946         if (IS_ERR(handle))
2947                 GOTO(unlock1, rc = PTR_ERR(handle));
2948
2949         hint->dah_parent = NULL;
2950         hint->dah_mode = 0;
2951         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2952         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2953         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2954          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2955          * parent MDT-object's layout EA. */
2956         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2957         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2958
2959         rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2960         if (rc != 0)
2961                 GOTO(stop, rc);
2962
2963         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2964                                   LU_XATTR_CREATE, handle);
2965         if (rc != 0)
2966                 GOTO(stop, rc);
2967
2968         rc = dt_trans_start(env, dev, handle);
2969         if (rc != 0)
2970                 GOTO(stop, rc);
2971
2972         dt_read_lock(env, parent, 0);
2973         if (unlikely(lfsck_is_dead_obj(parent)))
2974                 GOTO(unlock2, rc = 1);
2975
2976         rc = dt_create(env, child, cla, hint, NULL, handle);
2977         if (rc != 0)
2978                 GOTO(unlock2, rc);
2979
2980         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2981                           handle, BYPASS_CAPA);
2982
2983         GOTO(unlock2, rc);
2984
2985 unlock2:
2986         dt_read_unlock(env, parent);
2987
2988 stop:
2989         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2990
2991 unlock1:
2992         lfsck_layout_unlock(&lh);
2993
2994 log:
2995         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2996                "reference for: parent "DFID", child "DFID", OST-index %u, "
2997                "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2998                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2999                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
3000                llr->llr_lov_idx, pla->la_uid, pla->la_gid,
3001                create ? "Create the lost OST-object as required" :
3002                         "Keep the MDT-object there by default", rc);
3003
3004         return rc;
3005 }
3006
3007 /* If the OST-object does not recognize the MDT-object as its parent, and
3008  * there is no other MDT-object claims as its parent, then just trust the
3009  * given MDT-object as its parent. So update the OST-object filter_fid. */
3010 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
3011                                               struct lfsck_component *com,
3012                                               struct lfsck_layout_req *llr,
3013                                               const struct lu_attr *pla)
3014 {
3015         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3016         struct filter_fid               *pfid   = &info->lti_new_pfid;
3017         struct lu_attr                  *tla    = &info->lti_la3;
3018         struct dt_object                *parent = llr->llr_parent->llo_obj;
3019         struct dt_object                *child  = llr->llr_child;
3020         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3021         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
3022         struct thandle                  *handle;
3023         struct lu_buf                   *buf;
3024         struct lustre_handle             lh     = { 0 };
3025         int                              rc;
3026         ENTRY;
3027
3028         rc = lfsck_layout_lock(env, com, parent, &lh,
3029                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3030         if (rc != 0)
3031                 GOTO(log, rc);
3032
3033         handle = dt_trans_create(env, dev);
3034         if (IS_ERR(handle))
3035                 GOTO(unlock1, rc = PTR_ERR(handle));
3036
3037         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
3038         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
3039         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
3040          * MDT-object's FID::f_ver, instead it is the OST-object index in its
3041          * parent MDT-object's layout EA. */
3042         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
3043         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
3044
3045         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
3046         if (rc != 0)
3047                 GOTO(stop, rc);
3048
3049         tla->la_valid = LA_UID | LA_GID;
3050         tla->la_uid = pla->la_uid;
3051         tla->la_gid = pla->la_gid;
3052         rc = dt_declare_attr_set(env, child, tla, handle);
3053         if (rc != 0)
3054                 GOTO(stop, rc);
3055
3056         rc = dt_trans_start(env, dev, handle);
3057         if (rc != 0)
3058                 GOTO(stop, rc);
3059
3060         dt_write_lock(env, parent, 0);
3061         if (unlikely(lfsck_is_dead_obj(parent)))
3062                 GOTO(unlock2, rc = 1);
3063
3064         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
3065                           BYPASS_CAPA);
3066         if (rc != 0)
3067                 GOTO(unlock2, rc);
3068
3069         /* Get the latest parent's owner. */
3070         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3071         if (rc != 0)
3072                 GOTO(unlock2, rc);
3073
3074         tla->la_valid = LA_UID | LA_GID;
3075         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3076
3077         GOTO(unlock2, rc);
3078
3079 unlock2:
3080         dt_write_unlock(env, parent);
3081
3082 stop:
3083         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3084
3085 unlock1:
3086         lfsck_layout_unlock(&lh);
3087
3088 log:
3089         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
3090                "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
3091                "stripe-index %u, owner %u/%u: rc = %d\n",
3092                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3093                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3094                pla->la_uid, pla->la_gid, rc);
3095
3096         return rc;
3097 }
3098
3099 /* If there are more than one MDT-objects claim as the OST-object's parent,
3100  * and the OST-object only recognizes one of them, then we need to generate
3101  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
3102 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
3103                                                    struct lfsck_component *com,
3104                                                    struct lfsck_layout_req *llr,
3105                                                    struct lu_attr *la,
3106                                                    struct lu_buf *buf)
3107 {
3108         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3109         struct dt_allocation_hint       *hint   = &info->lti_hint;
3110         struct dt_object_format         *dof    = &info->lti_dof;
3111         struct dt_device                *pdev   = com->lc_lfsck->li_next;
3112         struct ost_id                   *oi     = &info->lti_oi;
3113         struct dt_object                *parent = llr->llr_parent->llo_obj;
3114         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
3115         struct dt_object                *child  = NULL;
3116         struct lu_device                *d      = &cdev->dd_lu_dev;
3117         struct lu_object                *o      = NULL;
3118         struct thandle                  *handle;
3119         struct lov_mds_md_v1            *lmm;
3120         struct lov_ost_data_v1          *objs;
3121         struct lustre_handle             lh     = { 0 };
3122         struct lu_buf                    ea_buf;
3123         __u32                            magic;
3124         int                              rc;
3125         ENTRY;
3126
3127         rc = lfsck_layout_lock(env, com, parent, &lh,
3128                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3129         if (rc != 0)
3130                 GOTO(log, rc);
3131
3132         handle = dt_trans_create(env, pdev);
3133         if (IS_ERR(handle))
3134                 GOTO(unlock1, rc = PTR_ERR(handle));
3135
3136         o = lu_object_anon(env, d, NULL);
3137         if (IS_ERR(o))
3138                 GOTO(stop, rc = PTR_ERR(o));
3139
3140         child = container_of(o, struct dt_object, do_lu);
3141         o = lu_object_locate(o->lo_header, d->ld_type);
3142         if (unlikely(o == NULL))
3143                 GOTO(stop, rc = -EINVAL);
3144
3145         child = container_of(o, struct dt_object, do_lu);
3146         la->la_valid = LA_UID | LA_GID;
3147         hint->dah_parent = NULL;
3148         hint->dah_mode = 0;
3149         dof->dof_type = DFT_REGULAR;
3150         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
3151         if (rc != 0)
3152                 GOTO(stop, rc);
3153
3154         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
3155                                   LU_XATTR_REPLACE, handle);
3156         if (rc != 0)
3157                 GOTO(stop, rc);
3158
3159         rc = dt_trans_start(env, pdev, handle);
3160         if (rc != 0)
3161                 GOTO(stop, rc);
3162
3163         dt_write_lock(env, parent, 0);
3164         if (unlikely(lfsck_is_dead_obj(parent)))
3165                 GOTO(unlock2, rc = 0);
3166
3167         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
3168         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
3169                 GOTO(unlock2, rc = 0);
3170
3171         lmm = buf->lb_buf;
3172         /* Someone change layout during the LFSCK, no need to repair then. */
3173         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
3174                 GOTO(unlock2, rc = 0);
3175
3176         rc = dt_create(env, child, la, hint, dof, handle);
3177         if (rc != 0)
3178                 GOTO(unlock2, rc);
3179
3180         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3181          * been verified in lfsck_layout_verify_header() already. If some
3182          * new magic introduced in the future, then layout LFSCK needs to
3183          * be updated also. */
3184         magic = le32_to_cpu(lmm->lmm_magic);
3185         if (magic == LOV_MAGIC_V1) {
3186                 objs = &lmm->lmm_objects[0];
3187         } else {
3188                 LASSERT(magic == LOV_MAGIC_V3);
3189                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3190         }
3191
3192         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
3193         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
3194         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
3195         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
3196         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
3197         lfsck_buf_init(&ea_buf, lmm,
3198                        lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
3199                                        magic));
3200         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
3201                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
3202
3203         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
3204
3205 unlock2:
3206         dt_write_unlock(env, parent);
3207
3208 stop:
3209         if (child != NULL)
3210                 lu_object_put(env, &child->do_lu);
3211
3212         dt_trans_stop(env, pdev, handle);
3213
3214 unlock1:
3215         lfsck_layout_unlock(&lh);
3216
3217 log:
3218         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
3219                "references for: parent "DFID", OST-index %u, stripe-index %u, "
3220                "owner %u/%u: rc = %d\n",
3221                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3222                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
3223
3224         return rc;
3225 }
3226
3227 /* If the MDT-object and the OST-object have different owner information,
3228  * then trust the MDT-object, because the normal chown/chgrp handle order
3229  * is from MDT to OST, and it is possible that some chown/chgrp operation
3230  * is partly done. */
3231 static int lfsck_layout_repair_owner(const struct lu_env *env,
3232                                      struct lfsck_component *com,
3233                                      struct lfsck_layout_req *llr,
3234                                      struct lu_attr *pla)
3235 {
3236         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3237         struct lu_attr                  *tla    = &info->lti_la3;
3238         struct dt_object                *parent = llr->llr_parent->llo_obj;
3239         struct dt_object                *child  = llr->llr_child;
3240         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3241         struct thandle                  *handle;
3242         int                              rc;
3243         ENTRY;
3244
3245         handle = dt_trans_create(env, dev);
3246         if (IS_ERR(handle))
3247                 GOTO(log, rc = PTR_ERR(handle));
3248
3249         tla->la_uid = pla->la_uid;
3250         tla->la_gid = pla->la_gid;
3251         tla->la_valid = LA_UID | LA_GID;
3252         rc = dt_declare_attr_set(env, child, tla, handle);
3253         if (rc != 0)
3254                 GOTO(stop, rc);
3255
3256         rc = dt_trans_start(env, dev, handle);
3257         if (rc != 0)
3258                 GOTO(stop, rc);
3259
3260         /* Use the dt_object lock to serialize with destroy and attr_set. */
3261         dt_read_lock(env, parent, 0);
3262         if (unlikely(lfsck_is_dead_obj(parent)))
3263                 GOTO(unlock, rc = 1);
3264
3265         /* Get the latest parent's owner. */
3266         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3267         if (rc != 0)
3268                 GOTO(unlock, rc);
3269
3270         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3271         if (unlikely(tla->la_uid != pla->la_uid ||
3272                      tla->la_gid != pla->la_gid))
3273                 GOTO(unlock, rc = 1);
3274
3275         tla->la_valid = LA_UID | LA_GID;
3276         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3277
3278         GOTO(unlock, rc);
3279
3280 unlock:
3281         dt_read_unlock(env, parent);
3282
3283 stop:
3284         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3285
3286 log:
3287         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
3288                "file owner for: parent "DFID", child "DFID", OST-index %u, "
3289                "stripe-index %u, owner %u/%u: rc = %d\n",
3290                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3291                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3292                pla->la_uid, pla->la_gid, rc);
3293
3294         return rc;
3295 }
3296
3297 /* Check whether the OST-object correctly back points to the
3298  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3299 static int lfsck_layout_check_parent(const struct lu_env *env,
3300                                      struct lfsck_component *com,
3301                                      struct dt_object *parent,
3302                                      const struct lu_fid *pfid,
3303                                      const struct lu_fid *cfid,
3304                                      const struct lu_attr *pla,
3305                                      const struct lu_attr *cla,
3306                                      struct lfsck_layout_req *llr,
3307                                      struct lu_buf *lov_ea, __u32 idx)
3308 {
3309         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3310         struct lu_buf                   *buf    = &info->lti_big_buf;
3311         struct dt_object                *tobj;
3312         struct lov_mds_md_v1            *lmm;
3313         struct lov_ost_data_v1          *objs;
3314         int                              rc;
3315         int                              i;
3316         __u32                            magic;
3317         __u16                            count;
3318         ENTRY;
3319
3320         if (fid_is_zero(pfid)) {
3321                 /* client never wrote. */
3322                 if (cla->la_size == 0 && cla->la_blocks == 0) {
3323                         if (unlikely(cla->la_uid != pla->la_uid ||
3324                                      cla->la_gid != pla->la_gid))
3325                                 RETURN (LLIT_INCONSISTENT_OWNER);
3326
3327                         RETURN(0);
3328                 }
3329
3330                 RETURN(LLIT_UNMATCHED_PAIR);
3331         }
3332
3333         if (unlikely(!fid_is_sane(pfid)))
3334                 RETURN(LLIT_UNMATCHED_PAIR);
3335
3336         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3337                 if (llr->llr_lov_idx == idx)
3338                         RETURN(0);
3339
3340                 RETURN(LLIT_UNMATCHED_PAIR);
3341         }
3342
3343         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3344         if (IS_ERR(tobj))
3345                 RETURN(PTR_ERR(tobj));
3346
3347         dt_read_lock(env, tobj, 0);
3348         if (dt_object_exists(tobj) == 0 ||
3349             lfsck_is_dead_obj(tobj))
3350                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3351
3352         if (!S_ISREG(lfsck_object_type(tobj)))
3353                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3354
3355         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3356          * remote one on another MDT. Then check whether the given OST-object
3357          * is in such layout. If yes, it is multiple referenced, otherwise it
3358          * is unmatched referenced case. */
3359         rc = lfsck_layout_get_lovea(env, tobj, buf);
3360         if (rc == 0 || rc == -ENOENT)
3361                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3362
3363         if (rc < 0)
3364                 GOTO(out, rc);
3365
3366         lmm = buf->lb_buf;
3367         magic = le32_to_cpu(lmm->lmm_magic);
3368         if (magic == LOV_MAGIC_V1) {
3369                 objs = &lmm->lmm_objects[0];
3370         } else {
3371                 LASSERT(magic == LOV_MAGIC_V3);
3372                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3373         }
3374
3375         count = le16_to_cpu(lmm->lmm_stripe_count);
3376         for (i = 0; i < count; i++, objs++) {
3377                 struct lu_fid           *tfid   = &info->lti_fid2;
3378                 struct ost_id           *oi     = &info->lti_oi;
3379                 __u32                    idx2;
3380
3381                 if (lovea_slot_is_dummy(objs))
3382                         continue;
3383
3384                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3385                 idx2 = le32_to_cpu(objs->l_ost_idx);
3386                 rc = ostid_to_fid(tfid, oi, idx2);
3387                 if (rc != 0) {
3388                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
3389                                "invalid layout EA at the slot %d, index %u\n",
3390                                lfsck_lfsck2name(com->lc_lfsck),
3391                                PFID(pfid), i, idx2);
3392
3393                         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3394                 }
3395
3396                 if (lu_fid_eq(cfid, tfid)) {
3397                         *lov_ea = *buf;
3398
3399                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
3400                 }
3401         }
3402
3403         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3404
3405 out:
3406         dt_read_unlock(env, tobj);
3407         lfsck_object_put(env, tobj);
3408
3409         return rc;
3410 }
3411
3412 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
3413                                              struct lfsck_component *com,
3414                                              struct lfsck_layout_req *llr)
3415 {
3416         struct lfsck_layout                  *lo     = com->lc_file_ram;
3417         struct lfsck_thread_info             *info   = lfsck_env_info(env);
3418         struct filter_fid_old                *pea    = &info->lti_old_pfid;
3419         struct lu_fid                        *pfid   = &info->lti_fid;
3420         struct lu_buf                         buf    = { 0 };
3421         struct dt_object                     *parent = llr->llr_parent->llo_obj;
3422         struct dt_object                     *child  = llr->llr_child;
3423         struct lu_attr                       *pla    = &info->lti_la;
3424         struct lu_attr                       *cla    = &info->lti_la2;
3425         struct lfsck_instance                *lfsck  = com->lc_lfsck;
3426         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
3427         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
3428         __u32                                 idx    = 0;
3429         int                                   rc;
3430         ENTRY;
3431
3432         if (unlikely(lfsck_is_dead_obj(parent)))
3433                 RETURN(0);
3434
3435         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3436         if (rc != 0)
3437                 GOTO(out, rc);
3438
3439         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3440         if (rc == -ENOENT) {
3441                 if (unlikely(lfsck_is_dead_obj(parent)))
3442                         RETURN(0);
3443
3444                 type = LLIT_DANGLING;
3445                 goto repair;
3446         }
3447
3448         if (rc != 0)
3449                 GOTO(out, rc);
3450
3451         lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
3452         rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
3453         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3454                      rc != sizeof(struct filter_fid))) {
3455                 type = LLIT_UNMATCHED_PAIR;
3456                 goto repair;
3457         }
3458
3459         if (rc < 0 && rc != -ENODATA)
3460                 GOTO(out, rc);
3461
3462         if (rc == -ENODATA) {
3463                 fid_zero(pfid);
3464         } else {
3465                 fid_le_to_cpu(pfid, &pea->ff_parent);
3466                 /* Currently, the filter_fid::ff_parent::f_ver is not the
3467                  * real parent MDT-object's FID::f_ver, instead it is the
3468                  * OST-object index in its parent MDT-object's layout EA. */
3469                 idx = pfid->f_stripe_idx;
3470                 pfid->f_ver = 0;
3471         }
3472
3473         rc = lfsck_layout_check_parent(env, com, parent, pfid,
3474                                        lu_object_fid(&child->do_lu),
3475                                        pla, cla, llr, &buf, idx);
3476         if (rc > 0) {
3477                 type = rc;
3478                 goto repair;
3479         }
3480
3481         if (rc < 0)
3482                 GOTO(out, rc);
3483
3484         if (unlikely(cla->la_uid != pla->la_uid ||
3485                      cla->la_gid != pla->la_gid)) {
3486                 type = LLIT_INCONSISTENT_OWNER;
3487                 goto repair;
3488         }
3489
3490 repair:
3491         if (bk->lb_param & LPF_DRYRUN) {
3492                 if (type != LLIT_NONE)
3493                         GOTO(out, rc = 1);
3494                 else
3495                         GOTO(out, rc = 0);
3496         }
3497
3498         switch (type) {
3499         case LLIT_DANGLING:
3500                 rc = lfsck_layout_repair_dangling(env, com, llr, pla);
3501                 break;
3502         case LLIT_UNMATCHED_PAIR:
3503                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3504                 break;
3505         case LLIT_MULTIPLE_REFERENCED:
3506                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3507                                                              pla, &buf);
3508                 break;
3509         case LLIT_INCONSISTENT_OWNER:
3510                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3511                 break;
3512         default:
3513                 rc = 0;
3514                 break;
3515         }
3516
3517         GOTO(out, rc);
3518
3519 out:
3520         down_write(&com->lc_sem);
3521         if (rc < 0) {
3522                 struct lfsck_layout_master_data *llmd = com->lc_data;
3523
3524                 if (unlikely(llmd->llmd_exit)) {
3525                         rc = 0;
3526                 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3527                            rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3528                            rc == -EHOSTUNREACH) {
3529                         /* If cannot touch the target server,
3530                          * mark the LFSCK as INCOMPLETE. */
3531                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3532                                "talk with OST %x: rc = %d\n",
3533                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3534                         lo->ll_flags |= LF_INCOMPLETE;
3535                         lo->ll_objs_skipped++;
3536                         rc = 0;
3537                 } else {
3538                         lfsck_layout_record_failure(env, lfsck, lo);
3539                 }
3540         } else if (rc > 0) {
3541                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3542                          "unknown type = %d\n", type);
3543
3544                 lo->ll_objs_repaired[type - 1]++;
3545                 if (bk->lb_param & LPF_DRYRUN &&
3546                     unlikely(lo->ll_pos_first_inconsistent == 0))
3547                         lo->ll_pos_first_inconsistent =
3548                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3549                                                         lfsck->li_di_oit);
3550         }
3551         up_write(&com->lc_sem);
3552
3553         return rc;
3554 }
3555
3556 static int lfsck_layout_assistant(void *args)
3557 {
3558         struct lfsck_thread_args        *lta     = args;
3559         struct lu_env                   *env     = &lta->lta_env;
3560         struct lfsck_component          *com     = lta->lta_com;
3561         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
3562         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3563         struct lfsck_position           *pos     = &com->lc_pos_start;
3564         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3565         struct lfsck_request            *lr      = &info->lti_lr;
3566         struct lfsck_layout_master_data *llmd    = com->lc_data;
3567         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3568         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3569         struct lfsck_layout_req         *llr;
3570         struct l_wait_info               lwi     = { 0 };
3571         int                              rc      = 0;
3572         int                              rc1     = 0;
3573         ENTRY;
3574
3575         memset(lr, 0, sizeof(*lr));
3576         lr->lr_event = LE_START;
3577         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3578                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
3579         lr->lr_speed = bk->lb_speed_limit;
3580         lr->lr_version = bk->lb_version;
3581         lr->lr_param = bk->lb_param;
3582         lr->lr_async_windows = bk->lb_async_windows;
3583         lr->lr_flags = LEF_TO_OST;
3584         if (pos->lp_oit_cookie <= 1)
3585                 lr->lr_param |= LPF_RESET;
3586
3587         rc = lfsck_layout_master_notify_others(env, com, lr);
3588         if (rc != 0) {
3589                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to notify "
3590                        "others for LFSCK start: rc = %d\n",
3591                        lfsck_lfsck2name(lfsck), rc);
3592                 GOTO(fini, rc);
3593         }
3594
3595         spin_lock(&llmd->llmd_lock);
3596         thread_set_flags(athread, SVC_RUNNING);
3597         spin_unlock(&llmd->llmd_lock);
3598         wake_up_all(&mthread->t_ctl_waitq);
3599
3600         while (1) {
3601                 while (!list_empty(&llmd->llmd_req_list)) {
3602                         bool wakeup = false;
3603
3604                         if (unlikely(llmd->llmd_exit ||
3605                                      !thread_is_running(mthread)))
3606                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
3607
3608                         llr = list_entry(llmd->llmd_req_list.next,
3609                                          struct lfsck_layout_req,
3610                                          llr_list);
3611                         /* Only the lfsck_layout_assistant thread itself can
3612                          * remove the "llr" from the head of the list, LFSCK
3613                          * engine thread only inserts other new "lld" at the
3614                          * end of the list. So it is safe to handle current
3615                          * "llr" without the spin_lock. */
3616                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
3617                         spin_lock(&llmd->llmd_lock);
3618                         list_del_init(&llr->llr_list);
3619                         llmd->llmd_prefetched--;
3620                         /* Wake up the main engine thread only when the list
3621                          * is empty or half of the prefetched items have been
3622                          * handled to avoid too frequent thread schedule. */
3623                         if (llmd->llmd_prefetched == 0 ||
3624                             (bk->lb_async_windows != 0 &&
3625                              bk->lb_async_windows / 2 ==
3626                              llmd->llmd_prefetched))
3627                                 wakeup = true;
3628                         spin_unlock(&llmd->llmd_lock);
3629                         if (wakeup)
3630                                 wake_up_all(&mthread->t_ctl_waitq);
3631
3632                         lfsck_layout_req_fini(env, llr);
3633                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3634                                 GOTO(cleanup1, rc);
3635                 }
3636
3637                 l_wait_event(athread->t_ctl_waitq,
3638                              !lfsck_layout_req_empty(llmd) ||
3639                              llmd->llmd_exit ||
3640                              llmd->llmd_to_post ||
3641                              llmd->llmd_to_double_scan,
3642                              &lwi);
3643
3644                 if (unlikely(llmd->llmd_exit))
3645                         GOTO(cleanup1, rc = llmd->llmd_post_result);
3646
3647                 if (!list_empty(&llmd->llmd_req_list))
3648                         continue;
3649
3650                 if (llmd->llmd_to_post) {
3651                         llmd->llmd_to_post = 0;
3652                         LASSERT(llmd->llmd_post_result > 0);
3653
3654                         memset(lr, 0, sizeof(*lr));
3655                         lr->lr_event = LE_PHASE1_DONE;
3656                         lr->lr_status = llmd->llmd_post_result;
3657                         rc = lfsck_layout_master_notify_others(env, com, lr);
3658                         if (rc != 0)
3659                                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant "
3660                                        "failed to notify others for LFSCK "
3661                                        "post: rc = %d\n",
3662                                        lfsck_lfsck2name(lfsck), rc);
3663
3664                         /* Wakeup the master engine to go ahead. */
3665                         wake_up_all(&mthread->t_ctl_waitq);
3666                 }
3667
3668                 if (llmd->llmd_to_double_scan) {
3669                         llmd->llmd_to_double_scan = 0;
3670                         atomic_inc(&lfsck->li_double_scan_count);
3671                         llmd->llmd_in_double_scan = 1;
3672                         wake_up_all(&mthread->t_ctl_waitq);
3673
3674                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 "
3675                                "scan start\n", lfsck_lfsck2name(lfsck));
3676
3677                         com->lc_new_checked = 0;
3678                         com->lc_new_scanned = 0;
3679                         com->lc_time_last_checkpoint = cfs_time_current();
3680                         com->lc_time_next_checkpoint =
3681                                 com->lc_time_last_checkpoint +
3682                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3683
3684                         /* flush all async updating before handling orphan. */
3685                         dt_sync(env, lfsck->li_next);
3686
3687                         while (llmd->llmd_in_double_scan) {
3688                                 struct lfsck_tgt_descs  *ltds =
3689                                                         &lfsck->li_ost_descs;
3690                                 struct lfsck_tgt_desc   *ltd;
3691
3692                                 rc = lfsck_layout_master_query_others(env, com);
3693                                 if (lfsck_layout_master_to_orphan(llmd))
3694                                         goto orphan;
3695
3696                                 if (rc < 0)
3697                                         GOTO(cleanup2, rc);
3698
3699                                 /* Pull LFSCK status on related targets once
3700                                  * per 30 seconds if we are not notified. */
3701                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
3702                                                            cfs_time_seconds(1),
3703                                                            NULL, NULL);
3704                                 rc = l_wait_event(athread->t_ctl_waitq,
3705                                         lfsck_layout_master_to_orphan(llmd) ||
3706                                         llmd->llmd_exit ||
3707                                         !thread_is_running(mthread),
3708                                         &lwi);
3709
3710                                 if (unlikely(llmd->llmd_exit ||
3711                                              !thread_is_running(mthread)))
3712                                         GOTO(cleanup2, rc = 0);
3713
3714                                 if (rc == -ETIMEDOUT)
3715                                         continue;
3716
3717                                 if (rc < 0)
3718                                         GOTO(cleanup2, rc);
3719
3720 orphan:
3721                                 spin_lock(&ltds->ltd_lock);
3722                                 while (!list_empty(
3723                                                 &llmd->llmd_ost_phase2_list)) {
3724                                         ltd = list_entry(
3725                                               llmd->llmd_ost_phase2_list.next,
3726                                               struct lfsck_tgt_desc,
3727                                               ltd_layout_phase_list);
3728                                         list_del_init(
3729                                                 &ltd->ltd_layout_phase_list);
3730                                         spin_unlock(&ltds->ltd_lock);
3731
3732                                         if (bk->lb_param & LPF_ALL_TGT) {
3733                                                 rc = lfsck_layout_scan_orphan(
3734                                                                 env, com, ltd);
3735                                                 if (rc != 0 &&
3736                                                     bk->lb_param & LPF_FAILOUT)
3737                                                         GOTO(cleanup2, rc);
3738                                         }
3739
3740                                         if (unlikely(llmd->llmd_exit ||
3741                                                 !thread_is_running(mthread)))
3742                                                 GOTO(cleanup2, rc = 0);
3743
3744                                         spin_lock(&ltds->ltd_lock);
3745                                 }
3746
3747                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
3748                                         spin_unlock(&ltds->ltd_lock);
3749                                         GOTO(cleanup2, rc = 1);
3750                                 }
3751                                 spin_unlock(&ltds->ltd_lock);
3752                         }
3753                 }
3754         }
3755
3756 cleanup1:
3757         /* Cleanup the unfinished requests. */
3758         spin_lock(&llmd->llmd_lock);
3759         if (rc < 0)
3760                 llmd->llmd_assistant_status = rc;
3761
3762         while (!list_empty(&llmd->llmd_req_list)) {
3763                 llr = list_entry(llmd->llmd_req_list.next,
3764                                  struct lfsck_layout_req,
3765                                  llr_list);
3766                 list_del_init(&llr->llr_list);
3767                 llmd->llmd_prefetched--;
3768                 spin_unlock(&llmd->llmd_lock);
3769                 lfsck_layout_req_fini(env, llr);
3770                 spin_lock(&llmd->llmd_lock);
3771         }
3772         spin_unlock(&llmd->llmd_lock);
3773
3774         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
3775                  llmd->llmd_prefetched);
3776
3777 cleanup2:
3778         memset(lr, 0, sizeof(*lr));
3779         if (rc > 0) {
3780                 lr->lr_event = LE_PHASE2_DONE;
3781                 lr->lr_status = rc;
3782         } else if (rc == 0) {
3783                 if (lfsck->li_flags & LPF_ALL_TGT) {
3784                         lr->lr_event = LE_STOP;
3785                         lr->lr_status = LS_STOPPED;
3786                 } else {
3787                         lr->lr_event = LE_PEER_EXIT;
3788                         switch (lfsck->li_status) {
3789                         case LS_PAUSED:
3790                         case LS_CO_PAUSED:
3791                                 lr->lr_status = LS_CO_PAUSED;
3792                                 break;
3793                         case LS_STOPPED:
3794                         case LS_CO_STOPPED:
3795                                 lr->lr_status = LS_CO_STOPPED;
3796                                 break;
3797                         default:
3798                                 CDEBUG(D_LFSCK, "%s: unknown status: rc = %d\n",
3799                                        lfsck_lfsck2name(lfsck),
3800                                        lfsck->li_status);
3801                                 lr->lr_status = LS_CO_FAILED;
3802                                 break;
3803                         }
3804                 }
3805         } else {
3806                 if (lfsck->li_flags & LPF_ALL_TGT) {
3807                         lr->lr_event = LE_STOP;
3808                         lr->lr_status = LS_FAILED;
3809                 } else {
3810                         lr->lr_event = LE_PEER_EXIT;
3811                         lr->lr_status = LS_CO_FAILED;
3812                 }
3813         }
3814
3815         rc1 = lfsck_layout_master_notify_others(env, com, lr);
3816         if (rc1 != 0) {
3817                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to "
3818                        "notify others for LFSCK quit: rc = %d\n",
3819                        lfsck_lfsck2name(lfsck), rc1);
3820                 rc = rc1;
3821         }
3822
3823         /* flush all async updating before exit. */
3824         dt_sync(env, lfsck->li_next);
3825
3826         /* Under force exit case, some requests may be just freed without
3827          * verification, those objects should be re-handled when next run.
3828          * So not update the on-disk tracing file under such case. */
3829         if (llmd->llmd_in_double_scan) {
3830                 struct lfsck_layout *lo = com->lc_file_ram;
3831
3832                 if (!llmd->llmd_exit)
3833                         rc1 = lfsck_layout_double_scan_result(env, com, rc);
3834
3835                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 scan "
3836                        "finished, status %d: rc = %d\n",
3837                        lfsck_lfsck2name(lfsck), lo->ll_status, rc1);
3838         }
3839
3840 fini:
3841         if (llmd->llmd_in_double_scan)
3842                 atomic_dec(&lfsck->li_double_scan_count);
3843
3844         spin_lock(&llmd->llmd_lock);
3845         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
3846         thread_set_flags(athread, SVC_STOPPED);
3847         wake_up_all(&mthread->t_ctl_waitq);
3848         spin_unlock(&llmd->llmd_lock);
3849         lfsck_thread_args_fini(lta);
3850
3851         return rc;
3852 }
3853
3854 static int
3855 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3856                                    struct ptlrpc_request *req,
3857                                    void *args, int rc)
3858 {
3859         struct lfsck_layout_slave_async_args *llsaa = args;
3860         struct obd_export                    *exp   = llsaa->llsaa_exp;
3861         struct lfsck_component               *com   = llsaa->llsaa_com;
3862         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3863         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3864         struct lfsck_reply                   *lr    = NULL;
3865         bool                                  done  = false;
3866
3867         if (rc != 0) {
3868                 /* It is quite probably caused by target crash,
3869                  * to make the LFSCK can go ahead, assume that
3870                  * the target finished the LFSCK prcoessing. */
3871                 done = true;
3872         } else {
3873                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3874                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3875                     lr->lr_status != LS_SCANNING_PHASE2)
3876                         done = true;
3877         }
3878
3879         if (done) {
3880                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave gets the MDT %x "
3881                        "status %d\n", lfsck_lfsck2name(com->lc_lfsck),
3882                        llst->llst_index, lr != NULL ? lr->lr_status : rc);
3883
3884                 lfsck_layout_llst_del(llsd, llst);
3885         }
3886
3887         lfsck_layout_llst_put(llst);
3888         lfsck_component_put(env, com);
3889         class_export_put(exp);
3890
3891         return 0;
3892 }
3893
3894 static int lfsck_layout_async_query(const struct lu_env *env,
3895                                     struct lfsck_component *com,
3896                                     struct obd_export *exp,
3897                                     struct lfsck_layout_slave_target *llst,
3898                                     struct lfsck_request *lr,
3899                                     struct ptlrpc_request_set *set)
3900 {
3901         struct lfsck_layout_slave_async_args *llsaa;
3902         struct ptlrpc_request                *req;
3903         struct lfsck_request                 *tmp;
3904         int                                   rc;
3905         ENTRY;
3906
3907         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3908         if (req == NULL)
3909                 RETURN(-ENOMEM);
3910
3911         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3912         if (rc != 0) {
3913                 ptlrpc_request_free(req);
3914                 RETURN(rc);
3915         }
3916
3917         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3918         *tmp = *lr;
3919         ptlrpc_request_set_replen(req);
3920
3921         llsaa = ptlrpc_req_async_args(req);
3922         llsaa->llsaa_exp = exp;
3923         llsaa->llsaa_com = lfsck_component_get(com);
3924         llsaa->llsaa_llst = llst;
3925         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3926         ptlrpc_set_add_req(set, req);
3927
3928         RETURN(0);
3929 }
3930
3931 static int lfsck_layout_async_notify(const struct lu_env *env,
3932                                      struct obd_export *exp,
3933                                      struct lfsck_request *lr,
3934                                      struct ptlrpc_request_set *set)
3935 {
3936         struct ptlrpc_request   *req;
3937         struct lfsck_request    *tmp;
3938         int                      rc;
3939         ENTRY;
3940
3941         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3942         if (req == NULL)
3943                 RETURN(-ENOMEM);
3944
3945         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3946         if (rc != 0) {
3947                 ptlrpc_request_free(req);
3948                 RETURN(rc);
3949         }
3950
3951         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3952         *tmp = *lr;
3953         ptlrpc_request_set_replen(req);
3954         ptlrpc_set_add_req(set, req);
3955
3956         RETURN(0);
3957 }
3958
3959 static int
3960 lfsck_layout_slave_query_master(const struct lu_env *env,
3961                                 struct lfsck_component *com)
3962 {
3963         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3964         struct lfsck_instance            *lfsck = com->lc_lfsck;
3965         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3966         struct lfsck_layout_slave_target *llst;
3967         struct obd_export                *exp;
3968         struct ptlrpc_request_set        *set;
3969         int                               rc    = 0;
3970         int                               rc1   = 0;
3971         ENTRY;
3972
3973         set = ptlrpc_prep_set();
3974         if (set == NULL)
3975                 GOTO(log, rc = -ENOMEM);
3976
3977         memset(lr, 0, sizeof(*lr));
3978         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3979         lr->lr_event = LE_QUERY;
3980         lr->lr_active = LFSCK_TYPE_LAYOUT;
3981
3982         llsd->llsd_touch_gen++;
3983         spin_lock(&llsd->llsd_lock);
3984         while (!list_empty(&llsd->llsd_master_list)) {
3985                 llst = list_entry(llsd->llsd_master_list.next,
3986                                   struct lfsck_layout_slave_target,
3987                                   llst_list);
3988                 if (llst->llst_gen == llsd->llsd_touch_gen)
3989                         break;
3990
3991                 llst->llst_gen = llsd->llsd_touch_gen;
3992                 list_move_tail(&llst->llst_list,
3993                                &llsd->llsd_master_list);
3994                 atomic_inc(&llst->llst_ref);
3995                 spin_unlock(&llsd->llsd_lock);
3996
3997                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3998                                                llst->llst_index);
3999                 if (exp == NULL) {
4000                         lfsck_layout_llst_del(llsd, llst);
4001                         lfsck_layout_llst_put(llst);
4002                         spin_lock(&llsd->llsd_lock);
4003                         continue;
4004                 }
4005
4006                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
4007                 if (rc != 0) {
4008                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
4009                                "query %s for layout: rc = %d\n",
4010                                lfsck_lfsck2name(lfsck),
4011                                exp->exp_obd->obd_name, rc);
4012
4013                         rc1 = rc;
4014                         lfsck_layout_llst_put(llst);
4015                         class_export_put(exp);
4016                 }
4017                 spin_lock(&llsd->llsd_lock);
4018         }
4019         spin_unlock(&llsd->llsd_lock);
4020
4021         rc = ptlrpc_set_wait(set);
4022         ptlrpc_set_destroy(set);
4023
4024         GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
4025
4026 log:
4027         CDEBUG(D_LFSCK, "%s: layout LFSCK slave queries master: rc = %d\n",
4028                lfsck_lfsck2name(com->lc_lfsck), rc);
4029
4030         return rc;
4031 }
4032
4033 static void
4034 lfsck_layout_slave_notify_master(const struct lu_env *env,
4035                                  struct lfsck_component *com,
4036                                  enum lfsck_events event, int result)
4037 {
4038         struct lfsck_instance            *lfsck = com->lc_lfsck;
4039         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4040         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
4041         struct lfsck_layout_slave_target *llst;
4042         struct obd_export                *exp;
4043         struct ptlrpc_request_set        *set;
4044         int                               rc;
4045         ENTRY;
4046
4047         CDEBUG(D_LFSCK, "%s: layout LFSCK slave notifies master\n",
4048                lfsck_lfsck2name(com->lc_lfsck));
4049
4050         set = ptlrpc_prep_set();
4051         if (set == NULL)
4052                 RETURN_EXIT;
4053
4054         memset(lr, 0, sizeof(*lr));
4055         lr->lr_event = event;
4056         lr->lr_flags = LEF_FROM_OST;
4057         lr->lr_status = result;
4058         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
4059         lr->lr_active = LFSCK_TYPE_LAYOUT;
4060         llsd->llsd_touch_gen++;
4061         spin_lock(&llsd->llsd_lock);
4062         while (!list_empty(&llsd->llsd_master_list)) {
4063                 llst = list_entry(llsd->llsd_master_list.next,
4064                                   struct lfsck_layout_slave_target,
4065                                   llst_list);
4066                 if (llst->llst_gen == llsd->llsd_touch_gen)
4067                         break;
4068
4069                 llst->llst_gen = llsd->llsd_touch_gen;
4070                 list_move_tail(&llst->llst_list,
4071                                &llsd->llsd_master_list);
4072                 atomic_inc(&llst->llst_ref);
4073                 spin_unlock(&llsd->llsd_lock);
4074
4075                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
4076                                                llst->llst_index);
4077                 if (exp == NULL) {
4078                         lfsck_layout_llst_del(llsd, llst);
4079                         lfsck_layout_llst_put(llst);
4080                         spin_lock(&llsd->llsd_lock);
4081                         continue;
4082                 }
4083
4084                 rc = lfsck_layout_async_notify(env, exp, lr, set);
4085                 if (rc != 0)
4086                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
4087                                "notify %s for layout: rc = %d\n",
4088                                lfsck_lfsck2name(lfsck),
4089                                exp->exp_obd->obd_name, rc);
4090
4091                 lfsck_layout_llst_put(llst);
4092                 class_export_put(exp);
4093                 spin_lock(&llsd->llsd_lock);
4094         }
4095         spin_unlock(&llsd->llsd_lock);
4096
4097         ptlrpc_set_wait(set);
4098         ptlrpc_set_destroy(set);
4099
4100         RETURN_EXIT;
4101 }
4102
4103 /*
4104  * \ret -ENODATA: unrecognized stripe
4105  * \ret = 0     : recognized stripe
4106  * \ret < 0     : other failures
4107  */
4108 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
4109                                            struct lfsck_component *com,
4110                                            struct lu_fid *cfid,
4111                                            struct lu_fid *pfid)
4112 {
4113         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4114         struct lu_buf                   *buf    = &info->lti_big_buf;
4115         struct ost_id                   *oi     = &info->lti_oi;
4116         struct dt_object                *obj;
4117         struct lov_mds_md_v1            *lmm;
4118         struct lov_ost_data_v1          *objs;
4119         __u32                            idx    = pfid->f_stripe_idx;
4120         __u32                            magic;
4121         int                              rc     = 0;
4122         int                              i;
4123         __u16                            count;
4124         ENTRY;
4125
4126         pfid->f_ver = 0;
4127         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
4128         if (IS_ERR(obj))
4129                 RETURN(PTR_ERR(obj));
4130
4131         dt_read_lock(env, obj, 0);
4132         if (unlikely(dt_object_exists(obj) == 0 ||
4133                      lfsck_is_dead_obj(obj)))
4134                 GOTO(unlock, rc = -ENOENT);
4135
4136         if (!S_ISREG(lfsck_object_type(obj)))
4137                 GOTO(unlock, rc = -ENODATA);
4138
4139         rc = lfsck_layout_get_lovea(env, obj, buf);
4140         if (rc < 0)
4141                 GOTO(unlock, rc);
4142
4143         if (rc == 0)
4144                 GOTO(unlock, rc = -ENODATA);
4145
4146         lmm = buf->lb_buf;
4147         rc = lfsck_layout_verify_header(lmm);
4148         if (rc != 0)
4149                 GOTO(unlock, rc);
4150
4151         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4152          * been verified in lfsck_layout_verify_header() already. If some
4153          * new magic introduced in the future, then layout LFSCK needs to
4154          * be updated also. */
4155         magic = le32_to_cpu(lmm->lmm_magic);
4156         if (magic == LOV_MAGIC_V1) {
4157                 objs = &lmm->lmm_objects[0];
4158         } else {
4159                 LASSERT(magic == LOV_MAGIC_V3);
4160                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4161         }
4162
4163         fid_to_ostid(cfid, oi);
4164         count = le16_to_cpu(lmm->lmm_stripe_count);
4165         for (i = 0; i < count; i++, objs++) {
4166                 struct ost_id oi2;
4167
4168                 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
4169                 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
4170                         GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
4171         }
4172
4173         GOTO(unlock, rc = -ENODATA);
4174
4175 unlock:
4176         dt_read_unlock(env, obj);
4177         lu_object_put(env, &obj->do_lu);
4178
4179         return rc;
4180 }
4181
4182 /*
4183  * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
4184  * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
4185  * layout EA from MDT to OST. On one hand, the OST no need to understand
4186  * the layout EA structure; on the other hand, it may cause trouble when
4187  * transfer large layout EA from MDT to OST via normal OUT RPC.
4188  *
4189  * \ret > 0: unrecognized stripe
4190  * \ret = 0: recognized stripe
4191  * \ret < 0: other failures
4192  */
4193 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
4194                                           struct lfsck_component *com,
4195                                           struct lu_fid *cfid,
4196                                           struct lu_fid *pfid)
4197 {
4198         struct lfsck_instance    *lfsck  = com->lc_lfsck;
4199         struct obd_device        *obd    = lfsck->li_obd;
4200         struct seq_server_site   *ss     =
4201                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
4202         struct obd_export        *exp    = NULL;
4203         struct ptlrpc_request    *req    = NULL;
4204         struct lfsck_request     *lr;
4205         struct lu_seq_range       range  = { 0 };
4206         int                       rc     = 0;
4207         ENTRY;
4208
4209         if (unlikely(fid_is_idif(pfid)))
4210                 RETURN(1);
4211
4212         fld_range_set_any(&range);
4213         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
4214         if (rc != 0)
4215                 RETURN(rc == -ENOENT ? 1 : rc);
4216
4217         if (unlikely(!fld_range_is_mdt(&range)))
4218                 RETURN(1);
4219
4220         exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
4221         if (unlikely(exp == NULL))
4222                 RETURN(1);
4223
4224         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
4225                 GOTO(out, rc = -EOPNOTSUPP);
4226
4227         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
4228         if (req == NULL)
4229                 GOTO(out, rc = -ENOMEM);
4230
4231         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
4232         if (rc != 0) {
4233                 ptlrpc_request_free(req);
4234
4235                 GOTO(out, rc);
4236         }
4237
4238         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
4239         memset(lr, 0, sizeof(*lr));
4240         lr->lr_event = LE_PAIRS_VERIFY;
4241         lr->lr_active = LFSCK_TYPE_LAYOUT;
4242         lr->lr_fid = *cfid; /* OST-object itself FID. */
4243         lr->lr_fid2 = *pfid; /* The claimed parent FID. */
4244
4245         ptlrpc_request_set_replen(req);
4246         rc = ptlrpc_queue_wait(req);
4247         ptlrpc_req_finished(req);
4248
4249         if (rc == -ENOENT || rc == -ENODATA)
4250                 rc = 1;
4251
4252         GOTO(out, rc);
4253
4254 out:
4255         if (exp != NULL)
4256                 class_export_put(exp);
4257
4258         return rc;
4259 }
4260
4261 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
4262                                           struct lfsck_component *com,
4263                                           struct lfsck_request *lr)
4264 {
4265         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4266         struct filter_fid               *ff     = &info->lti_new_pfid;
4267         struct lu_buf                   *buf;
4268         struct dt_device                *dev    = com->lc_lfsck->li_bottom;
4269         struct dt_object                *obj;
4270         struct thandle                  *th     = NULL;
4271         int                              rc     = 0;
4272         ENTRY;
4273
4274         obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
4275         if (IS_ERR(obj))
4276                 GOTO(log, rc = PTR_ERR(obj));
4277
4278         fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
4279         buf = lfsck_buf_get(env, ff, sizeof(*ff));
4280         dt_write_lock(env, obj, 0);
4281         if (unlikely(dt_object_exists(obj) == 0 ||
4282                      lfsck_is_dead_obj(obj)))
4283                 GOTO(unlock, rc = 0);
4284
4285         th = dt_trans_create(env, dev);
4286         if (IS_ERR(th))
4287                 GOTO(unlock, rc = PTR_ERR(th));
4288
4289         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
4290         if (rc != 0)
4291                 GOTO(stop, rc);
4292
4293         rc = dt_trans_start_local(env, dev, th);
4294         if (rc != 0)
4295                 GOTO(stop, rc);
4296
4297         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
4298
4299         GOTO(stop, rc);
4300
4301 stop:
4302         dt_trans_stop(env, dev, th);
4303
4304 unlock:
4305         dt_write_unlock(env, obj);
4306         lu_object_put(env, &obj->do_lu);
4307
4308 log:
4309         CDEBUG(D_LFSCK, "%s: layout LFSCK slave repaired pfid for "DFID
4310                ", parent "DFID": rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
4311                PFID(&lr->lr_fid), PFID(&lr->lr_fid2), rc);
4312
4313         return rc;
4314 }
4315
4316 /* layout APIs */
4317
4318 static int lfsck_layout_reset(const struct lu_env *env,
4319                               struct lfsck_component *com, bool init)
4320 {
4321         struct lfsck_layout     *lo    = com->lc_file_ram;
4322         int                      rc;
4323
4324         down_write(&com->lc_sem);
4325         if (init) {
4326                 memset(lo, 0, com->lc_file_size);
4327         } else {
4328                 __u32 count = lo->ll_success_count;
4329                 __u64 last_time = lo->ll_time_last_complete;
4330
4331                 memset(lo, 0, com->lc_file_size);
4332                 lo->ll_success_count = count;
4333                 lo->ll_time_last_complete = last_time;
4334         }
4335
4336         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
4337         lo->ll_status = LS_INIT;
4338
4339         rc = lfsck_layout_store(env, com);
4340         up_write(&com->lc_sem);
4341
4342         CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
4343                lfsck_lfsck2name(com->lc_lfsck), rc);
4344
4345         return rc;
4346 }
4347
4348 static void lfsck_layout_fail(const struct lu_env *env,
4349                               struct lfsck_component *com, bool new_checked)
4350 {
4351         struct lfsck_layout *lo = com->lc_file_ram;
4352
4353         down_write(&com->lc_sem);
4354         if (new_checked)
4355                 com->lc_new_checked++;
4356         lfsck_layout_record_failure(env, com->lc_lfsck, lo);
4357         up_write(&com->lc_sem);
4358 }
4359
4360 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
4361                                           struct lfsck_component *com, bool init)
4362 {
4363         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4364         struct lfsck_layout             *lo      = com->lc_file_ram;
4365         struct lfsck_layout_master_data *llmd    = com->lc_data;
4366         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4367         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4368         struct l_wait_info               lwi     = { 0 };
4369         int                              rc;
4370
4371         if (com->lc_new_checked == 0 && !init)
4372                 return 0;
4373
4374         l_wait_event(mthread->t_ctl_waitq,
4375                      list_empty(&llmd->llmd_req_list) ||
4376                      !thread_is_running(mthread) ||
4377                      thread_is_stopped(athread),
4378                      &lwi);
4379
4380         if (!thread_is_running(mthread) || thread_is_stopped(athread))
4381                 return 0;
4382
4383         down_write(&com->lc_sem);
4384         if (init) {
4385                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4386         } else {
4387                 lo->ll_pos_last_checkpoint =
4388                                         lfsck->li_pos_current.lp_oit_cookie;
4389                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4390                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4391                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4392                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4393                 com->lc_new_checked = 0;
4394         }
4395
4396         rc = lfsck_layout_store(env, com);
4397         up_write(&com->lc_sem);
4398
4399         CDEBUG(D_LFSCK, "%s: layout LFSCK master checkpoint at the pos ["
4400                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4401                lfsck->li_pos_current.lp_oit_cookie, rc);
4402
4403         return rc;
4404 }
4405
4406 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
4407                                          struct lfsck_component *com, bool init)
4408 {
4409         struct lfsck_instance   *lfsck = com->lc_lfsck;
4410         struct lfsck_layout     *lo    = com->lc_file_ram;
4411         int                      rc;
4412
4413         if (com->lc_new_checked == 0 && !init)
4414                 return 0;
4415
4416         down_write(&com->lc_sem);
4417         if (init) {
4418                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4419         } else {
4420                 lo->ll_pos_last_checkpoint =
4421                                         lfsck->li_pos_current.lp_oit_cookie;
4422                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4423                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4424                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4425                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4426                 com->lc_new_checked = 0;
4427         }
4428
4429         rc = lfsck_layout_store(env, com);
4430         up_write(&com->lc_sem);
4431
4432         CDEBUG(D_LFSCK, "%s: layout LFSCK slave checkpoint at the pos ["
4433                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4434                lfsck->li_pos_current.lp_oit_cookie, rc);
4435
4436         return rc;
4437 }
4438
4439 static int lfsck_layout_prep(const struct lu_env *env,
4440                              struct lfsck_component *com,
4441                              struct lfsck_start *start)
4442 {
4443         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4444         struct lfsck_layout     *lo     = com->lc_file_ram;
4445         struct lfsck_position   *pos    = &com->lc_pos_start;
4446
4447         fid_zero(&pos->lp_dir_parent);
4448         pos->lp_dir_cookie = 0;
4449         if (lo->ll_status == LS_COMPLETED ||
4450             lo->ll_status == LS_PARTIAL ||
4451             /* To handle orphan, must scan from the beginning. */
4452             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
4453                 int rc;
4454
4455                 rc = lfsck_layout_reset(env, com, false);
4456                 if (rc == 0)
4457                         rc = lfsck_set_param(env, lfsck, start, true);
4458
4459                 if (rc != 0) {
4460                         CDEBUG(D_LFSCK, "%s: layout LFSCK prep failed: "
4461                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4462
4463                         return rc;
4464                 }
4465         }
4466
4467         down_write(&com->lc_sem);
4468         lo->ll_time_latest_start = cfs_time_current_sec();
4469         spin_lock(&lfsck->li_lock);
4470         if (lo->ll_flags & LF_SCANNED_ONCE) {
4471                 if (!lfsck->li_drop_dryrun ||
4472                     lo->ll_pos_first_inconsistent == 0) {
4473                         lo->ll_status = LS_SCANNING_PHASE2;
4474                         list_move_tail(&com->lc_link,
4475                                        &lfsck->li_list_double_scan);
4476                         pos->lp_oit_cookie = 0;
4477                 } else {
4478                         int i;
4479
4480                         lo->ll_status = LS_SCANNING_PHASE1;
4481                         lo->ll_run_time_phase1 = 0;
4482                         lo->ll_run_time_phase2 = 0;
4483                         lo->ll_objs_checked_phase1 = 0;
4484                         lo->ll_objs_checked_phase2 = 0;
4485                         lo->ll_objs_failed_phase1 = 0;
4486                         lo->ll_objs_failed_phase2 = 0;
4487                         for (i = 0; i < LLIT_MAX; i++)
4488                                 lo->ll_objs_repaired[i] = 0;
4489
4490                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4491                         fid_zero(&com->lc_fid_latest_scanned_phase2);
4492                 }
4493         } else {
4494                 lo->ll_status = LS_SCANNING_PHASE1;
4495                 if (!lfsck->li_drop_dryrun ||
4496                     lo->ll_pos_first_inconsistent == 0)
4497                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4498                 else
4499                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4500         }
4501         spin_unlock(&lfsck->li_lock);
4502         up_write(&com->lc_sem);
4503
4504         return 0;
4505 }
4506
4507 static int lfsck_layout_slave_prep(const struct lu_env *env,
4508                                    struct lfsck_component *com,
4509                                    struct lfsck_start_param *lsp)
4510 {
4511         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4512         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4513         struct lfsck_layout             *lo     = com->lc_file_ram;
4514         struct lfsck_start              *start  = lsp->lsp_start;
4515         int                              rc;
4516
4517         rc = lfsck_layout_prep(env, com, start);
4518         if (rc != 0)
4519                 return rc;
4520
4521         if (lo->ll_flags & LF_CRASHED_LASTID &&
4522             list_empty(&llsd->llsd_master_list)) {
4523                 LASSERT(lfsck->li_out_notify != NULL);
4524
4525                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4526                                      LE_LASTID_REBUILDING);
4527         }
4528
4529         if (!lsp->lsp_index_valid)
4530                 return 0;
4531
4532         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4533         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
4534                 LASSERT(!llsd->llsd_rbtree_valid);
4535
4536                 write_lock(&llsd->llsd_rb_lock);
4537                 rc = lfsck_rbtree_setup(env, com);
4538                 write_unlock(&llsd->llsd_rb_lock);
4539         }
4540
4541         CDEBUG(D_LFSCK, "%s: layout LFSCK slave prep done, start pos ["
4542                LPU64"]\n", lfsck_lfsck2name(lfsck),
4543                com->lc_pos_start.lp_oit_cookie);
4544
4545         return rc;
4546 }
4547
4548 static int lfsck_layout_master_prep(const struct lu_env *env,
4549                                     struct lfsck_component *com,
4550                                     struct lfsck_start_param *lsp)
4551 {
4552         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4553         struct lfsck_layout_master_data *llmd    = com->lc_data;
4554         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4555         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4556         struct lfsck_thread_args        *lta;
4557         struct task_struct              *task;
4558         int                              rc;
4559         ENTRY;
4560
4561         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4562         if (rc != 0)
4563                 RETURN(rc);
4564
4565         llmd->llmd_assistant_status = 0;
4566         llmd->llmd_post_result = 0;
4567         llmd->llmd_to_post = 0;
4568         llmd->llmd_to_double_scan = 0;
4569         llmd->llmd_in_double_scan = 0;
4570         llmd->llmd_exit = 0;
4571         thread_set_flags(athread, 0);
4572
4573         lta = lfsck_thread_args_init(lfsck, com, lsp);
4574         if (IS_ERR(lta))
4575                 RETURN(PTR_ERR(lta));
4576
4577         task = kthread_run(lfsck_layout_assistant, lta, "lfsck_layout");
4578         if (IS_ERR(task)) {
4579                 rc = PTR_ERR(task);
4580                 CERROR("%s: cannot start LFSCK layout assistant thread: "
4581                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4582                 lfsck_thread_args_fini(lta);
4583         } else {
4584                 struct l_wait_info lwi = { 0 };
4585
4586                 l_wait_event(mthread->t_ctl_waitq,
4587                              thread_is_running(athread) ||
4588                              thread_is_stopped(athread),
4589                              &lwi);
4590                 if (unlikely(!thread_is_running(athread)))
4591                         rc = llmd->llmd_assistant_status;
4592                 else
4593                         rc = 0;
4594         }
4595
4596         CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
4597                LPU64"\n", lfsck_lfsck2name(lfsck),
4598                com->lc_pos_start.lp_oit_cookie);
4599
4600         RETURN(rc);
4601 }
4602
4603 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4604 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4605                                      struct lfsck_component *com,
4606                                      struct dt_object *parent,
4607                                      struct lov_mds_md_v1 *lmm)
4608 {
4609         struct lfsck_thread_info        *info    = lfsck_env_info(env);
4610         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4611         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
4612         struct lfsck_layout             *lo      = com->lc_file_ram;
4613         struct lfsck_layout_master_data *llmd    = com->lc_data;
4614         struct lfsck_layout_object      *llo     = NULL;
4615         struct lov_ost_data_v1          *objs;
4616         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
4617         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4618         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4619                 struct l_wait_info       lwi     = { 0 };
4620         struct lu_buf                    buf;
4621         int                              rc      = 0;
4622         int                              i;
4623         __u32                            magic;
4624         __u16                            count;
4625         __u16                            gen;
4626         ENTRY;
4627
4628         lfsck_buf_init(&buf, &info->lti_old_pfid,
4629                        sizeof(struct filter_fid_old));
4630         count = le16_to_cpu(lmm->lmm_stripe_count);
4631         gen = le16_to_cpu(lmm->lmm_layout_gen);
4632         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4633          * been verified in lfsck_layout_verify_header() already. If some
4634          * new magic introduced in the future, then layout LFSCK needs to
4635          * be updated also. */
4636         magic = le32_to_cpu(lmm->lmm_magic);
4637         if (magic == LOV_MAGIC_V1) {
4638                 objs = &lmm->lmm_objects[0];
4639         } else {
4640                 LASSERT(magic == LOV_MAGIC_V3);
4641                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4642         }
4643
4644         for (i = 0; i < count; i++, objs++) {
4645                 struct lu_fid           *fid    = &info->lti_fid;
4646                 struct ost_id           *oi     = &info->lti_oi;
4647                 struct lfsck_layout_req *llr;
4648                 struct lfsck_tgt_desc   *tgt    = NULL;
4649                 struct dt_object        *cobj   = NULL;
4650                 __u32                    index;
4651                 bool                     wakeup = false;
4652
4653                 if (unlikely(lovea_slot_is_dummy(objs)))
4654                         continue;
4655
4656                 l_wait_event(mthread->t_ctl_waitq,
4657                              bk->lb_async_windows == 0 ||
4658                              llmd->llmd_prefetched < bk->lb_async_windows ||
4659                              !thread_is_running(mthread) ||
4660                              thread_is_stopped(athread),
4661                              &lwi);
4662
4663                 if (unlikely(!thread_is_running(mthread)) ||
4664                              thread_is_stopped(athread))
4665                         GOTO(out, rc = 0);
4666
4667                 if (unlikely(lfsck_is_dead_obj(parent)))
4668                         GOTO(out, rc = 0);
4669
4670                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4671                 index = le32_to_cpu(objs->l_ost_idx);
4672                 rc = ostid_to_fid(fid, oi, index);
4673                 if (rc != 0) {
4674                         CDEBUG(D_LFSCK, "%s: get invalid layout EA for "DFID
4675                                ": "DOSTID", idx:%u\n", lfsck_lfsck2name(lfsck),
4676                                PFID(lfsck_dto2fid(parent)), POSTID(oi), index);
4677                         goto next;
4678                 }
4679
4680                 tgt = lfsck_tgt_get(ltds, index);
4681                 if (unlikely(tgt == NULL)) {
4682                         CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
4683                                "did not join the layout LFSCK\n",
4684                                lfsck_lfsck2name(lfsck), index);
4685                         lo->ll_flags |= LF_INCOMPLETE;
4686                         goto next;
4687                 }
4688
4689                 /* There is potential deadlock race condition between object
4690                  * destroy and layout LFSCK. Consider the following scenario:
4691                  *
4692                  * 1) The LFSCK thread obtained the parent object firstly, at
4693                  *    that time, the parent object has not been destroyed yet.
4694                  *
4695                  * 2) One RPC service thread destroyed the parent and all its
4696                  *    children objects. Because the LFSCK is referencing the
4697                  *    parent object, then the parent object will be marked as
4698                  *    dying in RAM. On the other hand, the parent object is
4699                  *    referencing all its children objects, then all children
4700                  *    objects will be marked as dying in RAM also.
4701                  *
4702                  * 3) The LFSCK thread tries to find some child object with
4703                  *    the parent object referenced. Then it will find that the
4704                  *    child object is dying. According to the object visibility
4705                  *    rules: the object with dying flag cannot be returned to
4706                  *    others. So the LFSCK thread has to wait until the dying
4707                  *    object has been purged from RAM, then it can allocate a
4708                  *    new object (with the same FID) in RAM. Unfortunately, the
4709                  *    LFSCK thread itself is referencing the parent object, and
4710                  *    cause the parent object cannot be purged, then cause the
4711                  *    child object cannot be purged also. So the LFSCK thread
4712                  *    will fall into deadlock.
4713                  *
4714                  * We introduce non-blocked version lu_object_find() to allow
4715                  * the LFSCK thread to return failure immediately (instead of
4716                  * wait) when it finds dying (child) object, then the LFSCK
4717                  * thread can check whether the parent object is dying or not.
4718                  * So avoid above deadlock. LU-5395 */
4719                 cobj = lfsck_object_find_by_dev_nowait(env, tgt->ltd_tgt, fid);
4720                 if (IS_ERR(cobj)) {
4721                         if (lfsck_is_dead_obj(parent)) {
4722                                 lfsck_tgt_put(tgt);
4723
4724                                 GOTO(out, rc = 0);
4725                         }
4726
4727                         rc = PTR_ERR(cobj);
4728                         goto next;
4729                 }
4730
4731                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4732                 if (rc != 0)
4733                         goto next;
4734
4735                 rc = dt_declare_xattr_get(env, cobj, &buf, XATTR_NAME_FID,
4736                                           BYPASS_CAPA);
4737                 if (rc != 0)
4738                         goto next;
4739
4740                 if (llo == NULL) {
4741                         llo = lfsck_layout_object_init(env, parent, gen);
4742                         if (IS_ERR(llo)) {
4743                                 rc = PTR_ERR(llo);
4744                                 goto next;
4745                         }
4746                 }
4747
4748                 llr = lfsck_layout_req_init(llo, cobj, index, i);
4749                 if (IS_ERR(llr)) {
4750                         rc = PTR_ERR(llr);
4751                         goto next;
4752                 }
4753
4754                 cobj = NULL;
4755                 spin_lock(&llmd->llmd_lock);
4756                 if (llmd->llmd_assistant_status < 0) {
4757                         spin_unlock(&llmd->llmd_lock);
4758                         lfsck_layout_req_fini(env, llr);
4759                         lfsck_tgt_put(tgt);
4760                         RETURN(llmd->llmd_assistant_status);
4761                 }
4762
4763                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
4764                 if (llmd->llmd_prefetched == 0)
4765                         wakeup = true;
4766
4767                 llmd->llmd_prefetched++;
4768                 spin_unlock(&llmd->llmd_lock);
4769                 if (wakeup)
4770                         wake_up_all(&athread->t_ctl_waitq);
4771
4772 next:
4773                 down_write(&com->lc_sem);
4774                 com->lc_new_checked++;
4775                 if (rc < 0)
4776                         lfsck_layout_record_failure(env, lfsck, lo);
4777                 up_write(&com->lc_sem);
4778
4779                 if (cobj != NULL && !IS_ERR(cobj))
4780                         lu_object_put(env, &cobj->do_lu);
4781
4782                 if (likely(tgt != NULL))
4783                         lfsck_tgt_put(tgt);
4784
4785                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4786                         GOTO(out, rc);
4787         }
4788
4789         GOTO(out, rc = 0);
4790
4791 out:
4792         if (llo != NULL && !IS_ERR(llo))
4793                 lfsck_layout_object_put(env, llo);
4794
4795         return rc;
4796 }
4797
4798 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4799  * the OST-object's attribute and generate an structure lfsck_layout_req on the
4800  * list ::llmd_req_list.
4801  *
4802  * For each request on above list, the lfsck_layout_assistant thread compares
4803  * the OST side attribute with local attribute, if inconsistent, then repair it.
4804  *
4805  * All above processing is async mode with pipeline. */
4806 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4807                                         struct lfsck_component *com,
4808                                         struct dt_object *obj)
4809 {
4810         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4811         struct ost_id                   *oi     = &info->lti_oi;
4812         struct lfsck_layout             *lo     = com->lc_file_ram;
4813         struct lfsck_layout_master_data *llmd   = com->lc_data;
4814         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4815         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4816         struct thandle                  *handle = NULL;
4817         struct lu_buf                   *buf    = &info->lti_big_buf;
4818         struct lov_mds_md_v1            *lmm    = NULL;
4819         struct dt_device                *dev    = lfsck->li_bottom;
4820         struct lustre_handle             lh     = { 0 };
4821         struct lu_buf                    ea_buf = { 0 };
4822         int                              rc     = 0;
4823         int                              size   = 0;
4824         bool                             locked = false;
4825         bool                             stripe = false;
4826         bool                             bad_oi = false;
4827         ENTRY;
4828
4829         if (!S_ISREG(lfsck_object_type(obj)))
4830                 GOTO(out, rc = 0);
4831
4832         if (llmd->llmd_assistant_status < 0)
4833                 GOTO(out, rc = -ESRCH);
4834
4835         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4836         lmm_oi_cpu_to_le(oi, oi);
4837         dt_read_lock(env, obj, 0);
4838         locked = true;
4839
4840 again:
4841         if (dt_object_exists(obj) == 0 ||
4842             lfsck_is_dead_obj(obj))
4843                 GOTO(out, rc = 0);
4844
4845         rc = lfsck_layout_get_lovea(env, obj, buf);
4846         if (rc <= 0)
4847                 GOTO(out, rc);
4848
4849         size = rc;
4850         lmm = buf->lb_buf;
4851         rc = lfsck_layout_verify_header(lmm);
4852         /* If the LOV EA crashed, then it is possible to be rebuilt later
4853          * when handle orphan OST-objects. */
4854         if (rc != 0)
4855                 GOTO(out, rc);
4856
4857         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4858                 GOTO(out, stripe = true);
4859
4860         /* Inconsistent lmm_oi, should be repaired. */
4861         bad_oi = true;
4862         lmm->lmm_oi = *oi;
4863
4864         if (bk->lb_param & LPF_DRYRUN) {
4865                 down_write(&com->lc_sem);
4866                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4867                 up_write(&com->lc_sem);
4868
4869                 GOTO(out, stripe = true);
4870         }
4871
4872         if (!lustre_handle_is_used(&lh)) {
4873                 dt_read_unlock(env, obj);
4874                 locked = false;
4875                 rc = lfsck_layout_lock(env, com, obj, &lh,
4876                                        MDS_INODELOCK_LAYOUT |
4877                                        MDS_INODELOCK_XATTR);
4878                 if (rc != 0)
4879                         GOTO(out, rc);
4880
4881                 handle = dt_trans_create(env, dev);
4882                 if (IS_ERR(handle))
4883                         GOTO(out, rc = PTR_ERR(handle));
4884
4885                 lfsck_buf_init(&ea_buf, lmm, size);
4886                 rc = dt_declare_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4887                                           LU_XATTR_REPLACE, handle);
4888                 if (rc != 0)
4889                         GOTO(out, rc);
4890
4891                 rc = dt_trans_start_local(env, dev, handle);
4892                 if (rc != 0)
4893                         GOTO(out, rc);
4894
4895                 dt_write_lock(env, obj, 0);
4896                 locked = true;
4897
4898                 goto again;
4899         }
4900
4901         rc = dt_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4902                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4903         if (rc != 0)
4904                 GOTO(out, rc);
4905
4906         down_write(&com->lc_sem);
4907         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4908         up_write(&com->lc_sem);
4909
4910         GOTO(out, stripe = true);
4911
4912 out:
4913         if (locked) {
4914                 if (lustre_handle_is_used(&lh))
4915                         dt_write_unlock(env, obj);
4916                 else
4917                         dt_read_unlock(env, obj);
4918         }
4919
4920         if (handle != NULL && !IS_ERR(handle))
4921                 dt_trans_stop(env, dev, handle);
4922
4923         lfsck_layout_unlock(&lh);
4924
4925         if (bad_oi)
4926                 CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
4927                        DFID": rc = %d\n", lfsck_lfsck2name(lfsck),
4928                        bk->lb_param & LPF_DRYRUN ? "found" : "repaired",
4929                        PFID(lfsck_dto2fid(obj)), rc);
4930
4931         if (stripe) {
4932                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4933         } else {
4934                 down_write(&com->lc_sem);
4935                 com->lc_new_checked++;
4936                 if (rc < 0)
4937                         lfsck_layout_record_failure(env, lfsck, lo);
4938                 up_write(&com->lc_sem);
4939         }
4940
4941         return rc;
4942 }
4943
4944 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4945                                        struct lfsck_component *com,
4946                                        struct dt_object *obj)
4947 {
4948         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4949         struct lfsck_layout             *lo     = com->lc_file_ram;
4950         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
4951         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4952         struct lfsck_layout_seq         *lls;
4953         __u64                            seq;
4954         __u64                            oid;
4955         int                              rc;
4956         ENTRY;
4957
4958         LASSERT(llsd != NULL);
4959
4960         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
4961             cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
4962                 struct l_wait_info       lwi = LWI_TIMEOUT(cfs_time_seconds(1),
4963                                                            NULL, NULL);
4964                 struct ptlrpc_thread    *thread = &lfsck->li_thread;
4965
4966                 l_wait_event(thread->t_ctl_waitq,
4967                              !thread_is_running(thread),
4968                              &lwi);
4969         }
4970
4971         lfsck_rbtree_update_bitmap(env, com, fid, false);
4972
4973         down_write(&com->lc_sem);
4974         if (fid_is_idif(fid))
4975                 seq = 0;
4976         else if (!fid_is_norm(fid) ||
4977                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4978                 GOTO(unlock, rc = 0);
4979         else
4980                 seq = fid_seq(fid);
4981         com->lc_new_checked++;
4982
4983         lls = lfsck_layout_seq_lookup(llsd, seq);
4984         if (lls == NULL) {
4985                 OBD_ALLOC_PTR(lls);
4986                 if (unlikely(lls == NULL))
4987                         GOTO(unlock, rc = -ENOMEM);
4988
4989                 INIT_LIST_HEAD(&lls->lls_list);
4990                 lls->lls_seq = seq;
4991                 rc = lfsck_layout_lastid_load(env, com, lls);
4992                 if (rc != 0) {
4993                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4994                               "load LAST_ID for "LPX64": rc = %d\n",
4995                               lfsck_lfsck2name(com->lc_lfsck), seq, rc);
4996                         lo->ll_objs_failed_phase1++;
4997                         OBD_FREE_PTR(lls);
4998                         GOTO(unlock, rc);
4999                 }
5000
5001                 lfsck_layout_seq_insert(llsd, lls);
5002         }
5003
5004         if (unlikely(fid_is_last_id(fid)))
5005                 GOTO(unlock, rc = 0);
5006
5007         if (fid_is_idif(fid))
5008                 oid = fid_idif_id(fid_seq(fid), fid_oid(fid), fid_ver(fid));
5009         else
5010                 oid = fid_oid(fid);
5011
5012         if (oid > lls->lls_lastid_known)
5013                 lls->lls_lastid_known = oid;
5014
5015         if (oid > lls->lls_lastid) {
5016                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
5017                         /* OFD may create new objects during LFSCK scanning. */
5018                         rc = lfsck_layout_lastid_reload(env, com, lls);
5019                         if (unlikely(rc != 0)) {
5020                                 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
5021                                       "reload LAST_ID for "LPX64": rc = %d\n",
5022                                       lfsck_lfsck2name(com->lc_lfsck),
5023                                       lls->lls_seq, rc);
5024
5025                                 GOTO(unlock, rc);
5026                         }
5027
5028                         if (oid <= lls->lls_lastid ||
5029                             lo->ll_flags & LF_CRASHED_LASTID)
5030                                 GOTO(unlock, rc = 0);
5031
5032                         LASSERT(lfsck->li_out_notify != NULL);
5033
5034                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5035                                              LE_LASTID_REBUILDING);
5036                         lo->ll_flags |= LF_CRASHED_LASTID;
5037
5038                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
5039                                "LAST_ID file (2) for the sequence "LPX64
5040                                ", old value "LPU64", known value "LPU64"\n",
5041                                lfsck_lfsck2name(lfsck), lls->lls_seq,
5042                                lls->lls_lastid, oid);
5043                 }
5044
5045                 lls->lls_lastid = oid;
5046                 lls->lls_dirty = 1;
5047         }
5048
5049         GOTO(unlock, rc = 0);
5050
5051 unlock:
5052         up_write(&com->lc_sem);
5053
5054         return rc;
5055 }
5056
5057 static int lfsck_layout_exec_dir(const struct lu_env *env,
5058                                  struct lfsck_component *com,
5059                                  struct dt_object *obj,
5060                                  struct lu_dirent *ent)
5061 {
5062         return 0;
5063 }
5064
5065 static int lfsck_layout_master_post(const struct lu_env *env,
5066                                     struct lfsck_component *com,
5067                                     int result, bool init)
5068 {
5069         struct lfsck_instance           *lfsck   = com->lc_lfsck;
5070         struct lfsck_layout             *lo      = com->lc_file_ram;
5071         struct lfsck_layout_master_data *llmd    = com->lc_data;
5072         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
5073         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5074         struct l_wait_info               lwi     = { 0 };
5075         int                              rc;
5076         ENTRY;
5077
5078
5079         llmd->llmd_post_result = result;
5080         llmd->llmd_to_post = 1;
5081         if (llmd->llmd_post_result <= 0)
5082                 llmd->llmd_exit = 1;
5083
5084         wake_up_all(&athread->t_ctl_waitq);
5085         l_wait_event(mthread->t_ctl_waitq,
5086                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
5087                      thread_is_stopped(athread),
5088                      &lwi);
5089
5090         if (llmd->llmd_assistant_status < 0)
5091                 result = llmd->llmd_assistant_status;
5092
5093         down_write(&com->lc_sem);
5094         spin_lock(&lfsck->li_lock);
5095         /* When LFSCK failed, there may be some prefetched objects those are
5096          * not been processed yet, we do not know the exactly position, then
5097          * just restart from last check-point next time. */
5098         if (!init && !llmd->llmd_exit)
5099                 lo->ll_pos_last_checkpoint =
5100                                         lfsck->li_pos_current.lp_oit_cookie;
5101
5102         if (result > 0) {
5103                 lo->ll_status = LS_SCANNING_PHASE2;
5104                 lo->ll_flags |= LF_SCANNED_ONCE;
5105                 lo->ll_flags &= ~LF_UPGRADE;
5106                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
5107         } else if (result == 0) {
5108                 lo->ll_status = lfsck->li_status;
5109                 if (lo->ll_status == 0)
5110                         lo->ll_status = LS_STOPPED;
5111                 if (lo->ll_status != LS_PAUSED) {
5112                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5113                 }
5114         } else {
5115                 lo->ll_status = LS_FAILED;
5116                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5117         }
5118         spin_unlock(&lfsck->li_lock);
5119
5120         if (!init) {
5121                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5122                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5123                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5124                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5125                 com->lc_new_checked = 0;
5126         }
5127
5128         rc = lfsck_layout_store(env, com);
5129         up_write(&com->lc_sem);
5130
5131         CDEBUG(D_LFSCK, "%s: layout LFSCK master post done: rc = %d\n",
5132                lfsck_lfsck2name(lfsck), rc);
5133
5134         RETURN(rc);
5135 }
5136
5137 static int lfsck_layout_slave_post(const struct lu_env *env,
5138                                    struct lfsck_component *com,
5139                                    int result, bool init)
5140 {
5141         struct lfsck_instance   *lfsck = com->lc_lfsck;
5142         struct lfsck_layout     *lo    = com->lc_file_ram;
5143         int                      rc;
5144         bool                     done  = false;
5145
5146         rc = lfsck_layout_lastid_store(env, com);
5147         if (rc != 0)
5148                 result = rc;
5149
5150         LASSERT(lfsck->li_out_notify != NULL);
5151
5152         down_write(&com->lc_sem);
5153         spin_lock(&lfsck->li_lock);
5154         if (!init)
5155                 lo->ll_pos_last_checkpoint =
5156                                         lfsck->li_pos_current.lp_oit_cookie;
5157         if (result > 0) {
5158                 lo->ll_status = LS_SCANNING_PHASE2;
5159                 lo->ll_flags |= LF_SCANNED_ONCE;
5160                 if (lo->ll_flags & LF_CRASHED_LASTID) {
5161                         done = true;
5162                         lo->ll_flags &= ~LF_CRASHED_LASTID;
5163
5164                         CDEBUG(D_LFSCK, "%s: layout LFSCK has rebuilt "
5165                                "crashed LAST_ID files successfully\n",
5166                                lfsck_lfsck2name(lfsck));
5167                 }
5168                 lo->ll_flags &= ~LF_UPGRADE;
5169                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
5170         } else if (result == 0) {
5171                 lo->ll_status = lfsck->li_status;
5172                 if (lo->ll_status == 0)
5173                         lo->ll_status = LS_STOPPED;
5174                 if (lo->ll_status != LS_PAUSED)
5175                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5176         } else {
5177                 lo->ll_status = LS_FAILED;
5178                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5179         }
5180         spin_unlock(&lfsck->li_lock);
5181
5182         if (done)
5183                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5184                                      LE_LASTID_REBUILT);
5185
5186         if (!init) {
5187                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5188                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5189                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5190                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5191                 com->lc_new_checked = 0;
5192         }
5193
5194         rc = lfsck_layout_store(env, com);
5195         up_write(&com->lc_sem);
5196
5197         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
5198
5199         if (result <= 0)
5200                 lfsck_rbtree_cleanup(env, com);
5201
5202         CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
5203                lfsck_lfsck2name(lfsck), rc);
5204
5205         return rc;
5206 }
5207
5208 static int lfsck_layout_dump(const struct lu_env *env,
5209                              struct lfsck_component *com, struct seq_file *m)
5210 {
5211         struct lfsck_instance   *lfsck = com->lc_lfsck;
5212         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
5213         struct lfsck_layout     *lo    = com->lc_file_ram;
5214         int                      rc;
5215
5216         down_read(&com->lc_sem);
5217         seq_printf(m, "name: lfsck_layout\n"
5218                       "magic: %#x\n"
5219                       "version: %d\n"
5220                       "status: %s\n",
5221                       lo->ll_magic,
5222                       bk->lb_version,
5223                       lfsck_status2names(lo->ll_status));
5224
5225         rc = lfsck_bits_dump(m, lo->ll_flags, lfsck_flags_names, "flags");
5226         if (rc < 0)
5227                 goto out;
5228
5229         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
5230         if (rc < 0)
5231                 goto out;
5232
5233         rc = lfsck_time_dump(m, lo->ll_time_last_complete,
5234                              "time_since_last_completed");
5235         if (rc < 0)
5236                 goto out;
5237
5238         rc = lfsck_time_dump(m, lo->ll_time_latest_start,
5239                              "time_since_latest_start");
5240         if (rc < 0)
5241                 goto out;
5242
5243         rc = lfsck_time_dump(m, lo->ll_time_last_checkpoint,
5244                              "time_since_last_checkpoint");
5245         if (rc < 0)
5246                 goto out;
5247
5248         seq_printf(m, "latest_start_position: "LPU64"\n"
5249                       "last_checkpoint_position: "LPU64"\n"
5250                       "first_failure_position: "LPU64"\n",
5251                       lo->ll_pos_latest_start,
5252                       lo->ll_pos_last_checkpoint,
5253                       lo->ll_pos_first_inconsistent);
5254
5255         seq_printf(m, "success_count: %u\n"
5256                       "repaired_dangling: "LPU64"\n"
5257                       "repaired_unmatched_pair: "LPU64"\n"
5258                       "repaired_multiple_referenced: "LPU64"\n"
5259                       "repaired_orphan: "LPU64"\n"
5260                       "repaired_inconsistent_owner: "LPU64"\n"
5261                       "repaired_others: "LPU64"\n"
5262                       "skipped: "LPU64"\n"
5263                       "failed_phase1: "LPU64"\n"
5264                       "failed_phase2: "LPU64"\n",
5265                       lo->ll_success_count,
5266                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
5267                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
5268                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
5269                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
5270                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
5271                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
5272                       lo->ll_objs_skipped,
5273                       lo->ll_objs_failed_phase1,
5274                       lo->ll_objs_failed_phase2);
5275
5276         if (lo->ll_status == LS_SCANNING_PHASE1) {
5277                 __u64 pos;
5278                 const struct dt_it_ops *iops;
5279                 cfs_duration_t duration = cfs_time_current() -
5280                                           lfsck->li_time_last_checkpoint;
5281                 __u64 checked = lo->ll_objs_checked_phase1 +
5282                                 com->lc_new_checked;
5283                 __u64 speed = checked;
5284                 __u64 new_checked = com->lc_new_checked * HZ;
5285                 __u32 rtime = lo->ll_run_time_phase1 +
5286                               cfs_duration_sec(duration + HALF_SEC);
5287
5288                 if (duration != 0)
5289                         do_div(new_checked, duration);
5290                 if (rtime != 0)
5291                         do_div(speed, rtime);
5292                 seq_printf(m, "checked_phase1: "LPU64"\n"
5293                               "checked_phase2: "LPU64"\n"
5294                               "run_time_phase1: %u seconds\n"
5295                               "run_time_phase2: %u seconds\n"
5296                               "average_speed_phase1: "LPU64" items/sec\n"
5297                               "average_speed_phase2: N/A\n"
5298                               "real-time_speed_phase1: "LPU64" items/sec\n"
5299                               "real-time_speed_phase2: N/A\n",
5300                               checked,
5301                               lo->ll_objs_checked_phase2,
5302                               rtime,
5303                               lo->ll_run_time_phase2,
5304                               speed,
5305                               new_checked);
5306
5307                 LASSERT(lfsck->li_di_oit != NULL);
5308
5309                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
5310
5311                 /* The low layer otable-based iteration position may NOT
5312                  * exactly match the layout-based directory traversal
5313                  * cookie. Generally, it is not a serious issue. But the
5314                  * caller should NOT make assumption on that. */
5315                 pos = iops->store(env, lfsck->li_di_oit);
5316                 if (!lfsck->li_current_oit_processed)
5317                         pos--;
5318                 seq_printf(m, "current_position: "LPU64"\n", pos);
5319
5320         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
5321                 cfs_duration_t duration = cfs_time_current() -
5322                                           lfsck->li_time_last_checkpoint;
5323                 __u64 checked = lo->ll_objs_checked_phase2 +
5324                                 com->lc_new_checked;
5325                 __u64 speed1 = lo->ll_objs_checked_phase1;
5326                 __u64 speed2 = checked;
5327                 __u64 new_checked = com->lc_new_checked * HZ;
5328                 __u32 rtime = lo->ll_run_time_phase2 +
5329                               cfs_duration_sec(duration + HALF_SEC);
5330
5331                 if (duration != 0)
5332                         do_div(new_checked, duration);
5333                 if (lo->ll_run_time_phase1 != 0)
5334                         do_div(speed1, lo->ll_run_time_phase1);
5335                 if (rtime != 0)
5336                         do_div(speed2, rtime);
5337                 rc = seq_printf(m, "checked_phase1: "LPU64"\n"
5338                                 "checked_phase2: "LPU64"\n"
5339                                 "run_time_phase1: %u seconds\n"
5340                                 "run_time_phase2: %u seconds\n"
5341                                 "average_speed_phase1: "LPU64" items/sec\n"
5342                                 "average_speed_phase2: "LPU64" items/sec\n"
5343                                 "real-time_speed_phase1: N/A\n"
5344                                 "real-time_speed_phase2: "LPU64" items/sec\n"
5345                                 "current_position: "DFID"\n",
5346                                 lo->ll_objs_checked_phase1,
5347                                 checked,
5348                                 lo->ll_run_time_phase1,
5349                                 rtime,
5350                                 speed1,
5351                                 speed2,
5352                                 new_checked,
5353                                 PFID(&com->lc_fid_latest_scanned_phase2));
5354                 if (rc <= 0)
5355                         goto out;
5356
5357         } else {
5358                 __u64 speed1 = lo->ll_objs_checked_phase1;
5359                 __u64 speed2 = lo->ll_objs_checked_phase2;
5360
5361                 if (lo->ll_run_time_phase1 != 0)
5362                         do_div(speed1, lo->ll_run_time_phase1);
5363                 if (lo->ll_run_time_phase2 != 0)
5364                         do_div(speed2, lo->ll_run_time_phase2);
5365                 seq_printf(m, "checked_phase1: "LPU64"\n"
5366                            "checked_phase2: "LPU64"\n"
5367                            "run_time_phase1: %u seconds\n"
5368                            "run_time_phase2: %u seconds\n"
5369                            "average_speed_phase1: "LPU64" items/sec\n"
5370                            "average_speed_phase2: "LPU64" objs/sec\n"
5371                            "real-time_speed_phase1: N/A\n"
5372                            "real-time_speed_phase2: N/A\n"
5373                            "current_position: N/A\n",
5374                            lo->ll_objs_checked_phase1,
5375                            lo->ll_objs_checked_phase2,
5376                            lo->ll_run_time_phase1,
5377                            lo->ll_run_time_phase2,
5378                            speed1,
5379                            speed2);
5380         }
5381 out:
5382         up_read(&com->lc_sem);
5383
5384         return rc;
5385 }
5386
5387 static int lfsck_layout_master_double_scan(const struct lu_env *env,
5388                                            struct lfsck_component *com)
5389 {
5390         struct lfsck_layout_master_data *llmd    = com->lc_data;
5391         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5392         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5393         struct lfsck_layout             *lo      = com->lc_file_ram;
5394         struct l_wait_info               lwi     = { 0 };
5395
5396         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
5397                 return 0;
5398
5399         llmd->llmd_to_double_scan = 1;
5400         wake_up_all(&athread->t_ctl_waitq);
5401         l_wait_event(mthread->t_ctl_waitq,
5402                      llmd->llmd_in_double_scan ||
5403                      thread_is_stopped(athread),
5404                      &lwi);
5405         if (llmd->llmd_assistant_status < 0)
5406                 return llmd->llmd_assistant_status;
5407
5408         return 0;
5409 }
5410
5411 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
5412                                           struct lfsck_component *com)
5413 {
5414         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5415         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5416         struct lfsck_layout             *lo     = com->lc_file_ram;
5417         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5418         int                              rc;
5419         ENTRY;
5420
5421         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
5422                 lfsck_rbtree_cleanup(env, com);
5423                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
5424                 RETURN(0);
5425         }
5426
5427         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
5428                lfsck_lfsck2name(lfsck));
5429
5430         atomic_inc(&lfsck->li_double_scan_count);
5431
5432         com->lc_new_checked = 0;
5433         com->lc_new_scanned = 0;
5434         com->lc_time_last_checkpoint = cfs_time_current();
5435         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
5436                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5437
5438         while (1) {
5439                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
5440                                                      NULL, NULL);
5441
5442                 rc = lfsck_layout_slave_query_master(env, com);
5443                 if (list_empty(&llsd->llsd_master_list)) {
5444                         if (unlikely(!thread_is_running(thread)))
5445                                 rc = 0;
5446                         else
5447                                 rc = 1;
5448
5449                         GOTO(done, rc);
5450                 }
5451
5452                 if (rc < 0)
5453                         GOTO(done, rc);
5454
5455                 rc = l_wait_event(thread->t_ctl_waitq,
5456                                   !thread_is_running(thread) ||
5457                                   list_empty(&llsd->llsd_master_list),
5458                                   &lwi);
5459                 if (unlikely(!thread_is_running(thread)))
5460                         GOTO(done, rc = 0);
5461
5462                 if (rc == -ETIMEDOUT)
5463                         continue;
5464
5465                 GOTO(done, rc = (rc < 0 ? rc : 1));
5466         }
5467
5468 done:
5469         rc = lfsck_layout_double_scan_result(env, com, rc);
5470
5471         lfsck_rbtree_cleanup(env, com);
5472         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
5473         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5474                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5475
5476         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan finished, "
5477                "status %d: rc = %d\n",
5478                lfsck_lfsck2name(lfsck), lo->ll_status, rc);
5479
5480         return rc;
5481 }
5482
5483 static void lfsck_layout_master_data_release(const struct lu_env *env,
5484                                              struct lfsck_component *com)
5485 {
5486         struct lfsck_layout_master_data *llmd   = com->lc_data;
5487         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5488         struct lfsck_tgt_descs          *ltds;
5489         struct lfsck_tgt_desc           *ltd;
5490         struct lfsck_tgt_desc           *next;
5491
5492         LASSERT(llmd != NULL);
5493         LASSERT(thread_is_init(&llmd->llmd_thread) ||
5494                 thread_is_stopped(&llmd->llmd_thread));
5495         LASSERT(list_empty(&llmd->llmd_req_list));
5496
5497         com->lc_data = NULL;
5498
5499         ltds = &lfsck->li_ost_descs;
5500         spin_lock(&ltds->ltd_lock);
5501         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
5502                                  ltd_layout_phase_list) {
5503                 list_del_init(&ltd->ltd_layout_phase_list);
5504         }
5505         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
5506                                  ltd_layout_phase_list) {
5507                 list_del_init(&ltd->ltd_layout_phase_list);
5508         }
5509         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
5510                                  ltd_layout_list) {
5511                 list_del_init(&ltd->ltd_layout_list);
5512         }
5513         spin_unlock(&ltds->ltd_lock);
5514
5515         ltds = &lfsck->li_mdt_descs;
5516         spin_lock(&ltds->ltd_lock);
5517         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
5518                                  ltd_layout_phase_list) {
5519                 list_del_init(&ltd->ltd_layout_phase_list);
5520         }
5521         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
5522                                  ltd_layout_phase_list) {
5523                 list_del_init(&ltd->ltd_layout_phase_list);
5524         }
5525         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
5526                                  ltd_layout_list) {
5527                 list_del_init(&ltd->ltd_layout_list);
5528         }
5529         spin_unlock(&ltds->ltd_lock);
5530
5531         OBD_FREE_PTR(llmd);
5532 }
5533
5534 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5535                                             struct lfsck_component *com)
5536 {
5537         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5538         struct lfsck_layout_seq          *lls;
5539         struct lfsck_layout_seq          *next;
5540         struct lfsck_layout_slave_target *llst;
5541         struct lfsck_layout_slave_target *tmp;
5542
5543         LASSERT(llsd != NULL);
5544
5545         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5546                                      lls_list) {
5547                 list_del_init(&lls->lls_list);
5548                 lfsck_object_put(env, lls->lls_lastid_obj);
5549                 OBD_FREE_PTR(lls);
5550         }
5551
5552         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
5553                                  llst_list) {
5554                 list_del_init(&llst->llst_list);
5555                 OBD_FREE_PTR(llst);
5556         }
5557
5558         lfsck_rbtree_cleanup(env, com);
5559         com->lc_data = NULL;
5560         OBD_FREE_PTR(llsd);
5561 }
5562
5563 static void lfsck_layout_master_quit(const struct lu_env *env,
5564                                      struct lfsck_component *com)
5565 {
5566         struct lfsck_layout_master_data *llmd    = com->lc_data;
5567         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5568         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5569         struct l_wait_info               lwi     = { 0 };
5570
5571         llmd->llmd_exit = 1;
5572         wake_up_all(&athread->t_ctl_waitq);
5573         l_wait_event(mthread->t_ctl_waitq,
5574                      thread_is_init(athread) ||
5575                      thread_is_stopped(athread),
5576                      &lwi);
5577 }
5578
5579 static void lfsck_layout_slave_quit(const struct lu_env *env,
5580                                     struct lfsck_component *com)
5581 {
5582         lfsck_rbtree_cleanup(env, com);
5583 }
5584
5585 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5586                                          struct lfsck_component *com,
5587                                          struct lfsck_request *lr)
5588 {
5589         struct lfsck_instance           *lfsck = com->lc_lfsck;
5590         struct lfsck_layout             *lo    = com->lc_file_ram;
5591         struct lfsck_layout_master_data *llmd  = com->lc_data;
5592         struct lfsck_tgt_descs          *ltds;
5593         struct lfsck_tgt_desc           *ltd;
5594         bool                             fail  = false;
5595         ENTRY;
5596
5597         if (lr->lr_event == LE_PAIRS_VERIFY) {
5598                 int rc;
5599
5600                 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5601                                                      &lr->lr_fid2);
5602
5603                 RETURN(rc);
5604         }
5605
5606         CDEBUG(D_LFSCK, "%s: layout LFSCK master handle notify %u "
5607                "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
5608                lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5609                lr->lr_index, lr->lr_status);
5610
5611         if (lr->lr_event != LE_PHASE1_DONE &&
5612             lr->lr_event != LE_PHASE2_DONE &&
5613             lr->lr_event != LE_PEER_EXIT)
5614                 RETURN(-EINVAL);
5615
5616         if (lr->lr_flags & LEF_FROM_OST)
5617                 ltds = &lfsck->li_ost_descs;
5618         else
5619                 ltds = &lfsck->li_mdt_descs;
5620         spin_lock(&ltds->ltd_lock);
5621         ltd = LTD_TGT(ltds, lr->lr_index);
5622         if (ltd == NULL) {
5623                 spin_unlock(&ltds->ltd_lock);
5624
5625                 RETURN(-ENXIO);
5626         }
5627
5628         list_del_init(&ltd->ltd_layout_phase_list);
5629         switch (lr->lr_event) {
5630         case LE_PHASE1_DONE:
5631                 if (lr->lr_status <= 0) {
5632                         ltd->ltd_layout_done = 1;
5633                         list_del_init(&ltd->ltd_layout_list);
5634                         lo->ll_flags |= LF_INCOMPLETE;
5635                         fail = true;
5636                         break;
5637                 }
5638
5639                 if (lr->lr_flags & LEF_FROM_OST) {
5640                         if (list_empty(&ltd->ltd_layout_list))
5641                                 list_add_tail(&ltd->ltd_layout_list,
5642                                               &llmd->llmd_ost_list);
5643                         list_add_tail(&ltd->ltd_layout_phase_list,
5644                                       &llmd->llmd_ost_phase2_list);
5645                 } else {
5646                         if (list_empty(&ltd->ltd_layout_list))
5647                                 list_add_tail(&ltd->ltd_layout_list,
5648                                               &llmd->llmd_mdt_list);
5649                         list_add_tail(&ltd->ltd_layout_phase_list,
5650                                       &llmd->llmd_mdt_phase2_list);
5651                 }
5652                 break;
5653         case LE_PHASE2_DONE:
5654                 ltd->ltd_layout_done = 1;
5655                 list_del_init(&ltd->ltd_layout_list);
5656                 break;
5657         case LE_PEER_EXIT:
5658                 fail = true;
5659                 ltd->ltd_layout_done = 1;
5660                 list_del_init(&ltd->ltd_layout_list);
5661                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
5662                         lo->ll_flags |= LF_INCOMPLETE;
5663                 break;
5664         default:
5665                 break;
5666         }
5667         spin_unlock(&ltds->ltd_lock);
5668
5669         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5670                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5671
5672                 memset(stop, 0, sizeof(*stop));
5673                 stop->ls_status = lr->lr_status;
5674                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5675                 lfsck_stop(env, lfsck->li_bottom, stop);
5676         } else if (lfsck_layout_master_to_orphan(llmd)) {
5677                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
5678         }
5679
5680         RETURN(0);
5681 }
5682
5683 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5684                                         struct lfsck_component *com,
5685                                         struct lfsck_request *lr)
5686 {
5687         struct lfsck_instance            *lfsck = com->lc_lfsck;
5688         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5689         struct lfsck_layout_slave_target *llst;
5690         int                               rc;
5691         ENTRY;
5692
5693         switch (lr->lr_event) {
5694         case LE_FID_ACCESSED:
5695                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5696                 RETURN(0);
5697         case LE_CONDITIONAL_DESTROY:
5698                 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5699                 RETURN(rc);
5700         case LE_PAIRS_VERIFY: {
5701                 lr->lr_status = LPVS_INIT;
5702                 /* Firstly, if the MDT-object which is claimed via OST-object
5703                  * local stored PFID xattr recognizes the OST-object, then it
5704                  * must be that the client given PFID is wrong. */
5705                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5706                                                     &lr->lr_fid3);
5707                 if (rc <= 0)
5708                         RETURN(0);
5709
5710                 lr->lr_status = LPVS_INCONSISTENT;
5711                 /* The OST-object local stored PFID xattr is stale. We need to
5712                  * check whether the MDT-object that is claimed via the client
5713                  * given PFID information recognizes the OST-object or not. If
5714                  * matches, then need to update the OST-object's PFID xattr. */
5715                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5716                                                     &lr->lr_fid2);
5717                 /* For rc < 0 case:
5718                  * We are not sure whether the client given PFID information
5719                  * is correct or not, do nothing to avoid improper fixing.
5720                  *
5721                  * For rc > 0 case:
5722                  * The client given PFID information is also invalid, we can
5723                  * NOT fix the OST-object inconsistency.
5724                  */
5725                 if (rc != 0)
5726                         RETURN(rc);
5727
5728                 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5729                 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5730
5731                 RETURN(rc);
5732         }
5733         case LE_PHASE2_DONE:
5734         case LE_PEER_EXIT:
5735                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave handle notify %u "
5736                        "from MDT %x, status %d\n", lfsck_lfsck2name(lfsck),
5737                        lr->lr_event, lr->lr_index, lr->lr_status);
5738                 break;
5739         default:
5740                 RETURN(-EINVAL);
5741         }
5742
5743         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5744         if (llst == NULL)
5745                 RETURN(-ENXIO);
5746
5747         lfsck_layout_llst_put(llst);
5748         if (list_empty(&llsd->llsd_master_list))
5749                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5750
5751         if (lr->lr_event == LE_PEER_EXIT &&
5752             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5753                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5754
5755                 memset(stop, 0, sizeof(*stop));
5756                 stop->ls_status = lr->lr_status;
5757                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5758                 lfsck_stop(env, lfsck->li_bottom, stop);
5759         }
5760
5761         RETURN(0);
5762 }
5763
5764 static int lfsck_layout_query(const struct lu_env *env,
5765                               struct lfsck_component *com)
5766 {
5767         struct lfsck_layout *lo = com->lc_file_ram;
5768
5769         return lo->ll_status;
5770 }
5771
5772 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
5773                                            struct lfsck_component *com,
5774                                            struct lfsck_tgt_descs *ltds,
5775                                            struct lfsck_tgt_desc *ltd,
5776                                            struct ptlrpc_request_set *set)
5777 {
5778         struct lfsck_thread_info          *info  = lfsck_env_info(env);
5779         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
5780         struct lfsck_request              *lr    = &info->lti_lr;
5781         struct lfsck_instance             *lfsck = com->lc_lfsck;
5782         int                                rc;
5783
5784         spin_lock(&ltds->ltd_lock);
5785         if (list_empty(&ltd->ltd_layout_list)) {
5786                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
5787                 spin_unlock(&ltds->ltd_lock);
5788
5789                 return 0;
5790         }
5791
5792         list_del_init(&ltd->ltd_layout_phase_list);
5793         list_del_init(&ltd->ltd_layout_list);
5794         spin_unlock(&ltds->ltd_lock);
5795
5796         memset(lr, 0, sizeof(*lr));
5797         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
5798         lr->lr_event = LE_PEER_EXIT;
5799         lr->lr_active = LFSCK_TYPE_LAYOUT;
5800         lr->lr_status = LS_CO_PAUSED;
5801         if (ltds == &lfsck->li_ost_descs)
5802                 lr->lr_flags = LEF_TO_OST;
5803
5804         laia->laia_com = com;
5805         laia->laia_ltds = ltds;
5806         atomic_inc(&ltd->ltd_ref);
5807         laia->laia_ltd = ltd;
5808         laia->laia_lr = lr;
5809         laia->laia_shared = 0;
5810
5811         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
5812                                  lfsck_layout_master_async_interpret,
5813                                  laia, LFSCK_NOTIFY);
5814         if (rc != 0) {
5815                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to notify %s %x "
5816                        "for co-stop: rc = %d\n",
5817                        lfsck_lfsck2name(lfsck),
5818                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5819                        ltd->ltd_index, rc);
5820                 lfsck_tgt_put(ltd);
5821         }
5822
5823         return rc;
5824 }
5825
5826 /* with lfsck::li_lock held */
5827 static int lfsck_layout_slave_join(const struct lu_env *env,
5828                                    struct lfsck_component *com,
5829                                    struct lfsck_start_param *lsp)
5830 {
5831         struct lfsck_instance            *lfsck = com->lc_lfsck;
5832         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5833         struct lfsck_layout_slave_target *llst;
5834         struct lfsck_start               *start = lsp->lsp_start;
5835         int                               rc    = 0;
5836         ENTRY;
5837
5838         if (start == NULL || !(start->ls_flags & LPF_ORPHAN))
5839                 RETURN(0);
5840
5841         if (!lsp->lsp_index_valid)
5842                 RETURN(-EINVAL);
5843
5844         /* If someone is running the LFSCK without orphan handling,
5845          * it will not maintain the object accessing rbtree. So we
5846          * cannot join it for orphan handling. */
5847         if (!llsd->llsd_rbtree_valid)
5848                 RETURN(-EBUSY);
5849
5850         spin_unlock(&lfsck->li_lock);
5851         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5852         spin_lock(&lfsck->li_lock);
5853         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5854                 spin_unlock(&lfsck->li_lock);
5855                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5856                                                       true);
5857                 if (llst != NULL)
5858                         lfsck_layout_llst_put(llst);
5859                 spin_lock(&lfsck->li_lock);
5860                 rc = -EAGAIN;
5861         }
5862
5863         RETURN(rc);
5864 }
5865
5866 static struct lfsck_operations lfsck_layout_master_ops = {
5867         .lfsck_reset            = lfsck_layout_reset,
5868         .lfsck_fail             = lfsck_layout_fail,
5869         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
5870         .lfsck_prep             = lfsck_layout_master_prep,
5871         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
5872         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5873         .lfsck_post             = lfsck_layout_master_post,
5874         .lfsck_interpret        = lfsck_layout_master_async_interpret,
5875         .lfsck_dump             = lfsck_layout_dump,
5876         .lfsck_double_scan      = lfsck_layout_master_double_scan,
5877         .lfsck_data_release     = lfsck_layout_master_data_release,
5878         .lfsck_quit             = lfsck_layout_master_quit,
5879         .lfsck_in_notify        = lfsck_layout_master_in_notify,
5880         .lfsck_query            = lfsck_layout_query,
5881         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
5882 };
5883
5884 static struct lfsck_operations lfsck_layout_slave_ops = {
5885         .lfsck_reset            = lfsck_layout_reset,
5886         .lfsck_fail             = lfsck_layout_fail,
5887         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
5888         .lfsck_prep             = lfsck_layout_slave_prep,
5889         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
5890         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5891         .lfsck_post             = lfsck_layout_slave_post,
5892         .lfsck_dump             = lfsck_layout_dump,
5893         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
5894         .lfsck_data_release     = lfsck_layout_slave_data_release,
5895         .lfsck_quit             = lfsck_layout_slave_quit,
5896         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
5897         .lfsck_query            = lfsck_layout_query,
5898         .lfsck_join             = lfsck_layout_slave_join,
5899 };
5900
5901 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5902 {
5903         struct lfsck_component  *com;
5904         struct lfsck_layout     *lo;
5905         struct dt_object        *root = NULL;
5906         struct dt_object        *obj;
5907         int                      rc;
5908         ENTRY;
5909
5910         OBD_ALLOC_PTR(com);
5911         if (com == NULL)
5912                 RETURN(-ENOMEM);
5913
5914         INIT_LIST_HEAD(&com->lc_link);
5915         INIT_LIST_HEAD(&com->lc_link_dir);
5916         init_rwsem(&com->lc_sem);
5917         atomic_set(&com->lc_ref, 1);
5918         com->lc_lfsck = lfsck;
5919         com->lc_type = LFSCK_TYPE_LAYOUT;
5920         if (lfsck->li_master) {
5921                 struct lfsck_layout_master_data *llmd;
5922
5923                 com->lc_ops = &lfsck_layout_master_ops;
5924                 OBD_ALLOC_PTR(llmd);
5925                 if (llmd == NULL)
5926                         GOTO(out, rc = -ENOMEM);
5927
5928                 INIT_LIST_HEAD(&llmd->llmd_req_list);
5929                 spin_lock_init(&llmd->llmd_lock);
5930                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
5931                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
5932                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
5933                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
5934                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
5935                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
5936                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
5937                 com->lc_data = llmd;
5938         } else {
5939                 struct lfsck_layout_slave_data *llsd;
5940
5941                 com->lc_ops = &lfsck_layout_slave_ops;
5942                 OBD_ALLOC_PTR(llsd);
5943                 if (llsd == NULL)
5944                         GOTO(out, rc = -ENOMEM);
5945
5946                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5947                 INIT_LIST_HEAD(&llsd->llsd_master_list);
5948                 spin_lock_init(&llsd->llsd_lock);
5949                 llsd->llsd_rb_root = RB_ROOT;
5950                 rwlock_init(&llsd->llsd_rb_lock);
5951                 com->lc_data = llsd;
5952         }
5953         com->lc_file_size = sizeof(*lo);
5954         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5955         if (com->lc_file_ram == NULL)
5956                 GOTO(out, rc = -ENOMEM);
5957
5958         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5959         if (com->lc_file_disk == NULL)
5960                 GOTO(out, rc = -ENOMEM);
5961
5962         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5963         if (IS_ERR(root))
5964                 GOTO(out, rc = PTR_ERR(root));
5965
5966         if (unlikely(!dt_try_as_dir(env, root)))
5967                 GOTO(out, rc = -ENOTDIR);
5968
5969         obj = local_file_find_or_create(env, lfsck->li_los, root,
5970                                         lfsck_layout_name,
5971                                         S_IFREG | S_IRUGO | S_IWUSR);
5972         if (IS_ERR(obj))
5973                 GOTO(out, rc = PTR_ERR(obj));
5974
5975         com->lc_obj = obj;
5976         rc = lfsck_layout_load(env, com);
5977         if (rc > 0)
5978                 rc = lfsck_layout_reset(env, com, true);
5979         else if (rc == -ENOENT)
5980                 rc = lfsck_layout_init(env, com);
5981
5982         if (rc != 0)
5983                 GOTO(out, rc);
5984
5985         lo = com->lc_file_ram;
5986         switch (lo->ll_status) {
5987         case LS_INIT:
5988         case LS_COMPLETED:
5989         case LS_FAILED:
5990         case LS_STOPPED:
5991         case LS_PARTIAL:
5992                 spin_lock(&lfsck->li_lock);
5993                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5994                 spin_unlock(&lfsck->li_lock);
5995                 break;
5996         default:
5997                 CERROR("%s: unknown lfsck_layout status %d\n",
5998                        lfsck_lfsck2name(lfsck), lo->ll_status);
5999                 /* fall through */
6000         case LS_SCANNING_PHASE1:
6001         case LS_SCANNING_PHASE2:
6002                 /* No need to store the status to disk right now.
6003                  * If the system crashed before the status stored,
6004                  * it will be loaded back when next time. */
6005                 lo->ll_status = LS_CRASHED;
6006                 lo->ll_flags |= LF_INCOMPLETE;
6007                 /* fall through */
6008         case LS_PAUSED:
6009         case LS_CRASHED:
6010         case LS_CO_FAILED:
6011         case LS_CO_STOPPED:
6012         case LS_CO_PAUSED:
6013                 spin_lock(&lfsck->li_lock);
6014                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
6015                 spin_unlock(&lfsck->li_lock);
6016                 break;
6017         }
6018
6019         if (lo->ll_flags & LF_CRASHED_LASTID) {
6020                 LASSERT(lfsck->li_out_notify != NULL);
6021
6022                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
6023                                      LE_LASTID_REBUILDING);
6024         }
6025
6026         GOTO(out, rc = 0);
6027
6028 out:
6029         if (root != NULL && !IS_ERR(root))
6030                 lu_object_put(env, &root->do_lu);
6031
6032         if (rc != 0) {
6033                 lfsck_component_cleanup(env, com);
6034                 CERROR("%s: fail to init layout LFSCK component: rc = %d\n",
6035                        lfsck_lfsck2name(lfsck), rc);
6036         }
6037
6038         return rc;
6039 }
6040
6041 struct lfsck_orphan_it {
6042         struct lfsck_component           *loi_com;
6043         struct lfsck_rbtree_node         *loi_lrn;
6044         struct lfsck_layout_slave_target *loi_llst;
6045         struct lu_fid                     loi_key;
6046         struct lu_orphan_rec              loi_rec;
6047         __u64                             loi_hash;
6048         unsigned int                      loi_over:1;
6049 };
6050
6051 static int lfsck_fid_match_idx(const struct lu_env *env,
6052                                struct lfsck_instance *lfsck,
6053                                const struct lu_fid *fid, int idx)
6054 {
6055         struct seq_server_site  *ss;
6056         struct lu_server_fld    *sf;
6057         struct lu_seq_range      range  = { 0 };
6058         int                      rc;
6059
6060         /* All abnormal cases will be returned to MDT0. */
6061         if (!fid_is_norm(fid)) {
6062                 if (idx == 0)
6063                         return 1;
6064
6065                 return 0;
6066         }
6067
6068         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
6069         if (unlikely(ss == NULL))
6070                 return -ENOTCONN;
6071
6072         sf = ss->ss_server_fld;
6073         LASSERT(sf != NULL);
6074
6075         fld_range_set_any(&range);
6076         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
6077         if (rc != 0)
6078                 return rc;
6079
6080         if (!fld_range_is_mdt(&range))
6081                 return -EINVAL;
6082
6083         if (range.lsr_index == idx)
6084                 return 1;
6085
6086         return 0;
6087 }
6088
6089 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
6090                                         struct dt_device *dev,
6091                                         struct dt_object *obj)
6092 {
6093         struct thandle *handle;
6094         int             rc;
6095         ENTRY;
6096
6097         handle = dt_trans_create(env, dev);
6098         if (IS_ERR(handle))
6099                 RETURN_EXIT;
6100
6101         rc = dt_declare_ref_del(env, obj, handle);
6102         if (rc != 0)
6103                 GOTO(stop, rc);
6104
6105         rc = dt_declare_destroy(env, obj, handle);
6106         if (rc != 0)
6107                 GOTO(stop, rc);
6108
6109         rc = dt_trans_start_local(env, dev, handle);
6110         if (rc != 0)
6111                 GOTO(stop, rc);
6112
6113         dt_write_lock(env, obj, 0);
6114         rc = dt_ref_del(env, obj, handle);
6115         if (rc == 0)
6116                 rc = dt_destroy(env, obj, handle);
6117         dt_write_unlock(env, obj);
6118
6119         GOTO(stop, rc);
6120
6121 stop:
6122         dt_trans_stop(env, dev, handle);
6123
6124         CDEBUG(D_LFSCK, "destroy orphan OST-object "DFID": rc = %d\n",
6125                PFID(lfsck_dto2fid(obj)), rc);
6126
6127         RETURN_EXIT;
6128 }
6129
6130 static int lfsck_orphan_index_lookup(const struct lu_env *env,
6131                                      struct dt_object *dt,
6132                                      struct dt_rec *rec,
6133                                      const struct dt_key *key,
6134                                      struct lustre_capa *capa)
6135 {
6136         return -EOPNOTSUPP;
6137 }
6138
6139 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
6140                                              struct dt_object *dt,
6141                                              const struct dt_rec *rec,
6142                                              const struct dt_key *key,
6143                                              struct thandle *handle)
6144 {
6145         return -EOPNOTSUPP;
6146 }
6147
6148 static int lfsck_orphan_index_insert(const struct lu_env *env,
6149                                      struct dt_object *dt,
6150                                      const struct dt_rec *rec,
6151                                      const struct dt_key *key,
6152                                      struct thandle *handle,
6153                                      struct lustre_capa *capa,
6154                                      int ignore_quota)
6155 {
6156         return -EOPNOTSUPP;
6157 }
6158
6159 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
6160                                              struct dt_object *dt,
6161                                              const struct dt_key *key,
6162                                              struct thandle *handle)
6163 {
6164         return -EOPNOTSUPP;
6165 }
6166
6167 static int lfsck_orphan_index_delete(const struct lu_env *env,
6168                                      struct dt_object *dt,
6169                                      const struct dt_key *key,
6170                                      struct thandle *handle,
6171                                      struct lustre_capa *capa)
6172 {
6173         return -EOPNOTSUPP;
6174 }
6175
6176 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
6177                                           struct dt_object *dt,
6178                                           __u32 attr,
6179                                           struct lustre_capa *capa)
6180 {
6181         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
6182         struct lfsck_instance           *lfsck;
6183         struct lfsck_component          *com    = NULL;
6184         struct lfsck_layout_slave_data  *llsd;
6185         struct lfsck_orphan_it          *it     = NULL;
6186         int                              rc     = 0;
6187         ENTRY;
6188
6189         lfsck = lfsck_instance_find(dev, true, false);
6190         if (unlikely(lfsck == NULL))
6191                 RETURN(ERR_PTR(-ENXIO));
6192
6193         com = lfsck_component_find(lfsck, LFSCK_TYPE_LAYOUT);
6194         if (unlikely(com == NULL))
6195                 GOTO(out, rc = -ENOENT);
6196
6197         llsd = com->lc_data;
6198         if (!llsd->llsd_rbtree_valid)
6199                 GOTO(out, rc = -ESRCH);
6200
6201         OBD_ALLOC_PTR(it);
6202         if (it == NULL)
6203                 GOTO(out, rc = -ENOMEM);
6204
6205         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
6206         if (it->loi_llst == NULL)
6207                 GOTO(out, rc = -ENXIO);
6208
6209         if (dev->dd_record_fid_accessed) {
6210                 /* The first iteration against the rbtree, scan the whole rbtree
6211                  * to remove the nodes which do NOT need to be handled. */
6212                 write_lock(&llsd->llsd_rb_lock);
6213                 if (dev->dd_record_fid_accessed) {
6214                         struct rb_node                  *node;
6215                         struct rb_node                  *next;
6216                         struct lfsck_rbtree_node        *lrn;
6217
6218                         /* No need to record the fid accessing anymore. */
6219                         dev->dd_record_fid_accessed = 0;
6220
6221                         node = rb_first(&llsd->llsd_rb_root);
6222                         while (node != NULL) {
6223                                 next = rb_next(node);
6224                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
6225                                                lrn_node);
6226                                 if (atomic_read(&lrn->lrn_known_count) <=
6227                                     atomic_read(&lrn->lrn_accessed_count)) {
6228                                         rb_erase(node, &llsd->llsd_rb_root);
6229                                         lfsck_rbtree_free(lrn);
6230                                 }
6231                                 node = next;
6232                         }
6233                 }
6234                 write_unlock(&llsd->llsd_rb_lock);
6235         }
6236
6237         /* read lock the rbtree when init, and unlock when fini */
6238         read_lock(&llsd->llsd_rb_lock);
6239         it->loi_com = com;
6240         com = NULL;
6241
6242         GOTO(out, rc = 0);
6243
6244 out:
6245         if (com != NULL)
6246                 lfsck_component_put(env, com);
6247
6248         CDEBUG(D_LFSCK, "%s: init the orphan iteration: rc = %d\n",
6249                lfsck_lfsck2name(lfsck), rc);
6250
6251         lfsck_instance_put(env, lfsck);
6252         if (rc != 0) {
6253                 if (it != NULL)
6254                         OBD_FREE_PTR(it);
6255
6256                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
6257         }
6258
6259         return (struct dt_it *)it;
6260 }
6261
6262 static void lfsck_orphan_it_fini(const struct lu_env *env,
6263                                  struct dt_it *di)
6264 {
6265         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
6266         struct lfsck_component           *com   = it->loi_com;
6267         struct lfsck_layout_slave_data   *llsd;
6268         struct lfsck_layout_slave_target *llst;
6269
6270         if (com != NULL) {
6271                 CDEBUG(D_LFSCK, "%s: fini the orphan iteration\n",
6272                        lfsck_lfsck2name(com->lc_lfsck));
6273
6274                 llsd = com->lc_data;
6275                 read_unlock(&llsd->llsd_rb_lock);
6276                 llst = it->loi_llst;
6277                 LASSERT(llst != NULL);
6278
6279                 /* Save the key and hash for iterate next. */
6280                 llst->llst_fid = it->loi_key;
6281                 llst->llst_hash = it->loi_hash;
6282                 lfsck_layout_llst_put(llst);
6283                 lfsck_component_put(env, com);
6284         }
6285         OBD_FREE_PTR(it);
6286 }
6287
6288 /**
6289  * \retval       +1: the iteration finished
6290  * \retval        0: on success, not finished
6291  * \retval      -ve: on error
6292  */
6293 static int lfsck_orphan_it_next(const struct lu_env *env,
6294                                 struct dt_it *di)
6295 {
6296         struct lfsck_thread_info        *info   = lfsck_env_info(env);
6297         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
6298         struct lu_attr                  *la     = &info->lti_la;
6299         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
6300         struct lu_fid                   *key    = &it->loi_key;
6301         struct lu_orphan_rec            *rec    = &it->loi_rec;
6302         struct lfsck_component          *com    = it->loi_com;
6303         struct lfsck_instance           *lfsck  = com->lc_lfsck;
6304         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
6305         struct dt_object                *obj;
6306         struct lfsck_rbtree_node        *lrn;
6307         int                              pos;
6308         int                              rc;
6309         __u32                            save;
6310         __u32                            idx    = it->loi_llst->llst_index;
6311         bool                             exact  = false;
6312         ENTRY;
6313
6314         if (it->loi_over)
6315                 RETURN(1);
6316
6317 again0:
6318         lrn = it->loi_lrn;
6319         if (lrn == NULL) {
6320                 lrn = lfsck_rbtree_search(llsd, key, &exact);
6321                 if (lrn == NULL) {
6322                         it->loi_over = 1;
6323                         RETURN(1);
6324                 }
6325
6326                 it->loi_lrn = lrn;
6327                 if (!exact) {
6328                         key->f_seq = lrn->lrn_seq;
6329                         key->f_oid = lrn->lrn_first_oid;
6330                         key->f_ver = 0;
6331                 }
6332         } else {
6333                 key->f_oid++;
6334                 if (unlikely(key->f_oid == 0)) {
6335                         key->f_seq++;
6336                         it->loi_lrn = NULL;
6337                         goto again0;
6338                 }
6339
6340                 if (key->f_oid >=
6341                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
6342                         it->loi_lrn = NULL;
6343                         goto again0;
6344                 }
6345         }
6346
6347         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
6348                      atomic_read(&lrn->lrn_accessed_count))) {
6349                 struct rb_node *next = rb_next(&lrn->lrn_node);
6350
6351                 while (next != NULL) {
6352                         lrn = rb_entry(next, struct lfsck_rbtree_node,
6353                                        lrn_node);
6354                         if (atomic_read(&lrn->lrn_known_count) >
6355                             atomic_read(&lrn->lrn_accessed_count))
6356                                 break;
6357                         next = rb_next(next);
6358                 }
6359
6360                 if (next == NULL) {
6361                         it->loi_over = 1;
6362                         RETURN(1);
6363                 }
6364
6365                 it->loi_lrn = lrn;
6366                 key->f_seq = lrn->lrn_seq;
6367                 key->f_oid = lrn->lrn_first_oid;
6368                 key->f_ver = 0;
6369         }
6370
6371         pos = key->f_oid - lrn->lrn_first_oid;
6372
6373 again1:
6374         pos = find_next_bit(lrn->lrn_known_bitmap,
6375                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
6376         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
6377                 key->f_oid = lrn->lrn_first_oid + pos;
6378                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
6379                         key->f_seq++;
6380                         key->f_oid = 0;
6381                 }
6382                 it->loi_lrn = NULL;
6383                 goto again0;
6384         }
6385
6386         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
6387                 pos++;
6388                 goto again1;
6389         }
6390
6391         key->f_oid = lrn->lrn_first_oid + pos;
6392         obj = lfsck_object_find(env, lfsck, key);
6393         if (IS_ERR(obj)) {
6394                 rc = PTR_ERR(obj);
6395                 if (rc == -ENOENT) {
6396                         pos++;
6397                         goto again1;
6398                 }
6399                 RETURN(rc);
6400         }
6401
6402         dt_read_lock(env, obj, 0);
6403         if (dt_object_exists(obj) == 0 ||
6404             lfsck_is_dead_obj(obj)) {
6405                 dt_read_unlock(env, obj);
6406                 lfsck_object_put(env, obj);
6407                 pos++;
6408                 goto again1;
6409         }
6410
6411         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
6412         if (rc != 0)
6413                 GOTO(out, rc);
6414
6415         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
6416                           XATTR_NAME_FID, BYPASS_CAPA);
6417         if (rc == -ENODATA) {
6418                 /* For the pre-created OST-object, update the bitmap to avoid
6419                  * others LFSCK (second phase) iteration to touch it again. */
6420                 if (la->la_ctime == 0) {
6421                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
6422                                 atomic_inc(&lrn->lrn_accessed_count);
6423
6424                         /* For the race between repairing dangling referenced
6425                          * MDT-object and unlink the file, it may left orphan
6426                          * OST-object there. Destroy it now! */
6427                         if (unlikely(!(la->la_mode & S_ISUID))) {
6428                                 dt_read_unlock(env, obj);
6429                                 lfsck_layout_destroy_orphan(env,
6430                                                             lfsck->li_bottom,
6431                                                             obj);
6432                                 lfsck_object_put(env, obj);
6433                                 pos++;
6434                                 goto again1;
6435                         }
6436                 } else if (idx == 0) {
6437                         /* If the orphan OST-object has no parent information,
6438                          * regard it as referenced by the MDT-object on MDT0. */
6439                         fid_zero(&rec->lor_fid);
6440                         rec->lor_uid = la->la_uid;
6441                         rec->lor_gid = la->la_gid;
6442                         GOTO(out, rc = 0);
6443                 }
6444
6445                 dt_read_unlock(env, obj);
6446                 lfsck_object_put(env, obj);
6447                 pos++;
6448                 goto again1;
6449         }
6450
6451         if (rc < 0)
6452                 GOTO(out, rc);
6453
6454         if (rc != sizeof(struct filter_fid) &&
6455             rc != sizeof(struct filter_fid_old))
6456                 GOTO(out, rc = -EINVAL);
6457
6458         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6459         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6460          * MDT-object's FID::f_ver, instead it is the OST-object index in its
6461          * parent MDT-object's layout EA. */
6462         save = rec->lor_fid.f_stripe_idx;
6463         rec->lor_fid.f_ver = 0;
6464         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6465         /* If the orphan OST-object does not claim the MDT, then next.
6466          *
6467          * If we do not know whether it matches or not, then return it
6468          * to the MDT for further check. */
6469         if (rc == 0) {
6470                 dt_read_unlock(env, obj);
6471                 lfsck_object_put(env, obj);
6472                 pos++;
6473                 goto again1;
6474         }
6475
6476         rec->lor_fid.f_stripe_idx = save;
6477         rec->lor_uid = la->la_uid;
6478         rec->lor_gid = la->la_gid;
6479
6480         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6481                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6482                rec->lor_uid, rec->lor_gid);
6483
6484         GOTO(out, rc = 0);
6485
6486 out:
6487         dt_read_unlock(env, obj);
6488         lfsck_object_put(env, obj);
6489         if (rc == 0)
6490                 it->loi_hash++;
6491
6492         return rc;
6493 }
6494
6495 /**
6496  * \retval       +1: locate to the exactly position
6497  * \retval        0: cannot locate to the exactly position,
6498  *                   call next() to move to a valid position.
6499  * \retval      -ve: on error
6500  */
6501 static int lfsck_orphan_it_get(const struct lu_env *env,
6502                                struct dt_it *di,
6503                                const struct dt_key *key)
6504 {
6505         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6506         int                      rc;
6507
6508         it->loi_key = *(struct lu_fid *)key;
6509         rc = lfsck_orphan_it_next(env, di);
6510         if (rc == 1)
6511                 return 0;
6512
6513         if (rc == 0)
6514                 return 1;
6515
6516         return rc;
6517 }
6518
6519 static void lfsck_orphan_it_put(const struct lu_env *env,
6520                                 struct dt_it *di)
6521 {
6522 }
6523
6524 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6525                                           const struct dt_it *di)
6526 {
6527         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6528
6529         return (struct dt_key *)&it->loi_key;
6530 }
6531
6532 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6533                                     const struct dt_it *di)
6534 {
6535         return sizeof(struct lu_fid);
6536 }
6537
6538 static int lfsck_orphan_it_rec(const struct lu_env *env,
6539                                const struct dt_it *di,
6540                                struct dt_rec *rec,
6541                                __u32 attr)
6542 {
6543         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6544
6545         *(struct lu_orphan_rec *)rec = it->loi_rec;
6546
6547         return 0;
6548 }
6549
6550 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6551                                    const struct dt_it *di)
6552 {
6553         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6554
6555         return it->loi_hash;
6556 }
6557
6558 /**
6559  * \retval       +1: locate to the exactly position
6560  * \retval        0: cannot locate to the exactly position,
6561  *                   call next() to move to a valid position.
6562  * \retval      -ve: on error
6563  */
6564 static int lfsck_orphan_it_load(const struct lu_env *env,
6565                                 const struct dt_it *di,
6566                                 __u64 hash)
6567 {
6568         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
6569         struct lfsck_layout_slave_target *llst = it->loi_llst;
6570         int                               rc;
6571
6572         LASSERT(llst != NULL);
6573
6574         if (hash != llst->llst_hash) {
6575                 CDEBUG(D_LFSCK, "%s: the given hash "LPU64" for orphan "
6576                        "iteration does not match the one when fini "
6577                        LPU64", to be reset.\n",
6578                        lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6579                        llst->llst_hash);
6580                 fid_zero(&llst->llst_fid);
6581                 llst->llst_hash = 0;
6582         }
6583
6584         it->loi_key = llst->llst_fid;
6585         it->loi_hash = llst->llst_hash;
6586         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6587         if (rc == 1)
6588                 return 0;
6589
6590         if (rc == 0)
6591                 return 1;
6592
6593         return rc;
6594 }
6595
6596 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6597                                    const struct dt_it *di,
6598                                    void *key_rec)
6599 {
6600         return 0;
6601 }
6602
6603 const struct dt_index_operations lfsck_orphan_index_ops = {
6604         .dio_lookup             = lfsck_orphan_index_lookup,
6605         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
6606         .dio_insert             = lfsck_orphan_index_insert,
6607         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
6608         .dio_delete             = lfsck_orphan_index_delete,
6609         .dio_it = {
6610                 .init           = lfsck_orphan_it_init,
6611                 .fini           = lfsck_orphan_it_fini,
6612                 .get            = lfsck_orphan_it_get,
6613                 .put            = lfsck_orphan_it_put,
6614                 .next           = lfsck_orphan_it_next,
6615                 .key            = lfsck_orphan_it_key,
6616                 .key_size       = lfsck_orphan_it_key_size,
6617                 .rec            = lfsck_orphan_it_rec,
6618                 .store          = lfsck_orphan_it_store,
6619                 .load           = lfsck_orphan_it_load,
6620                 .key_rec        = lfsck_orphan_it_key_rec,
6621         }
6622 };