Whamcloud - gitweb
LU-4682 llite: a few fixes for migration.
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_linkea.h>
43 #include <lustre_fid.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <md_object.h>
48 #include <obd_class.h>
49
50 #include "lfsck_internal.h"
51
52 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
53
54 static const char lfsck_layout_name[] = "lfsck_layout";
55
56 struct lfsck_layout_seq {
57         struct list_head         lls_list;
58         __u64                    lls_seq;
59         __u64                    lls_lastid;
60         __u64                    lls_lastid_known;
61         struct dt_object        *lls_lastid_obj;
62         unsigned int             lls_dirty:1;
63 };
64
65 struct lfsck_layout_slave_target {
66         /* link into lfsck_layout_slave_data::llsd_master_list. */
67         struct list_head        llst_list;
68         /* The position for next record in the rbtree for iteration. */
69         struct lu_fid           llst_fid;
70         /* Dummy hash for iteration against the rbtree. */
71         __u64                   llst_hash;
72         __u64                   llst_gen;
73         atomic_t                llst_ref;
74         __u32                   llst_index;
75 };
76
77 struct lfsck_layout_slave_data {
78         /* list for lfsck_layout_seq */
79         struct list_head         llsd_seq_list;
80
81         /* list for the masters involve layout verification. */
82         struct list_head         llsd_master_list;
83         spinlock_t               llsd_lock;
84         __u64                    llsd_touch_gen;
85         struct dt_object        *llsd_rb_obj;
86         struct rb_root           llsd_rb_root;
87         rwlock_t                 llsd_rb_lock;
88         unsigned int             llsd_rbtree_valid:1;
89 };
90
91 struct lfsck_layout_object {
92         struct dt_object        *llo_obj;
93         struct lu_attr           llo_attr;
94         atomic_t                 llo_ref;
95         __u16                    llo_gen;
96 };
97
98 struct lfsck_layout_req {
99         struct list_head                 llr_list;
100         struct lfsck_layout_object      *llr_parent;
101         struct dt_object                *llr_child;
102         __u32                            llr_ost_idx;
103         __u32                            llr_lov_idx; /* offset in LOV EA */
104 };
105
106 struct lfsck_layout_master_data {
107         spinlock_t              llmd_lock;
108         struct list_head        llmd_req_list;
109
110         /* list for the ost targets involve layout verification. */
111         struct list_head        llmd_ost_list;
112
113         /* list for the ost targets in phase1 scanning. */
114         struct list_head        llmd_ost_phase1_list;
115
116         /* list for the ost targets in phase1 scanning. */
117         struct list_head        llmd_ost_phase2_list;
118
119         /* list for the mdt targets involve layout verification. */
120         struct list_head        llmd_mdt_list;
121
122         /* list for the mdt targets in phase1 scanning. */
123         struct list_head        llmd_mdt_phase1_list;
124
125         /* list for the mdt targets in phase1 scanning. */
126         struct list_head        llmd_mdt_phase2_list;
127
128         struct ptlrpc_thread    llmd_thread;
129         __u32                   llmd_touch_gen;
130         int                     llmd_prefetched;
131         int                     llmd_assistant_status;
132         int                     llmd_post_result;
133         unsigned int            llmd_to_post:1,
134                                 llmd_to_double_scan:1,
135                                 llmd_in_double_scan:1,
136                                 llmd_exit:1;
137 };
138
139 struct lfsck_layout_slave_async_args {
140         struct obd_export                *llsaa_exp;
141         struct lfsck_component           *llsaa_com;
142         struct lfsck_layout_slave_target *llsaa_llst;
143 };
144
145 static struct lfsck_layout_object *
146 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
147                          __u16 gen)
148 {
149         struct lfsck_layout_object *llo;
150         int                         rc;
151
152         OBD_ALLOC_PTR(llo);
153         if (llo == NULL)
154                 return ERR_PTR(-ENOMEM);
155
156         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
157         if (rc != 0) {
158                 OBD_FREE_PTR(llo);
159
160                 return ERR_PTR(rc);
161         }
162
163         lu_object_get(&obj->do_lu);
164         llo->llo_obj = obj;
165         /* The gen can be used to check whether some others have changed the
166          * file layout after LFSCK pre-fetching but before real verification. */
167         llo->llo_gen = gen;
168         atomic_set(&llo->llo_ref, 1);
169
170         return llo;
171 }
172
173 static inline void
174 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
175 {
176         if (atomic_dec_and_test(&llst->llst_ref)) {
177                 LASSERT(list_empty(&llst->llst_list));
178
179                 OBD_FREE_PTR(llst);
180         }
181 }
182
183 static inline int
184 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
185 {
186         struct lfsck_layout_slave_target *llst;
187         struct lfsck_layout_slave_target *tmp;
188         int                               rc   = 0;
189
190         OBD_ALLOC_PTR(llst);
191         if (llst == NULL)
192                 return -ENOMEM;
193
194         INIT_LIST_HEAD(&llst->llst_list);
195         llst->llst_gen = 0;
196         llst->llst_index = index;
197         atomic_set(&llst->llst_ref, 1);
198
199         spin_lock(&llsd->llsd_lock);
200         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
201                 if (tmp->llst_index == index) {
202                         rc = -EALREADY;
203                         break;
204                 }
205         }
206         if (rc == 0)
207                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
208         spin_unlock(&llsd->llsd_lock);
209
210         if (rc != 0)
211                 OBD_FREE_PTR(llst);
212
213         return rc;
214 }
215
216 static inline void
217 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
218                       struct lfsck_layout_slave_target *llst)
219 {
220         bool del = false;
221
222         spin_lock(&llsd->llsd_lock);
223         if (!list_empty(&llst->llst_list)) {
224                 list_del_init(&llst->llst_list);
225                 del = true;
226         }
227         spin_unlock(&llsd->llsd_lock);
228
229         if (del)
230                 lfsck_layout_llst_put(llst);
231 }
232
233 static inline struct lfsck_layout_slave_target *
234 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
235                                __u32 index, bool unlink)
236 {
237         struct lfsck_layout_slave_target *llst;
238
239         spin_lock(&llsd->llsd_lock);
240         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
241                 if (llst->llst_index == index) {
242                         if (unlink)
243                                 list_del_init(&llst->llst_list);
244                         else
245                                 atomic_inc(&llst->llst_ref);
246                         spin_unlock(&llsd->llsd_lock);
247
248                         return llst;
249                 }
250         }
251         spin_unlock(&llsd->llsd_lock);
252
253         return NULL;
254 }
255
256 static inline void lfsck_layout_object_put(const struct lu_env *env,
257                                            struct lfsck_layout_object *llo)
258 {
259         if (atomic_dec_and_test(&llo->llo_ref)) {
260                 lfsck_object_put(env, llo->llo_obj);
261                 OBD_FREE_PTR(llo);
262         }
263 }
264
265 static struct lfsck_layout_req *
266 lfsck_layout_req_init(struct lfsck_layout_object *parent,
267                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
268 {
269         struct lfsck_layout_req *llr;
270
271         OBD_ALLOC_PTR(llr);
272         if (llr == NULL)
273                 return ERR_PTR(-ENOMEM);
274
275         INIT_LIST_HEAD(&llr->llr_list);
276         atomic_inc(&parent->llo_ref);
277         llr->llr_parent = parent;
278         llr->llr_child = child;
279         llr->llr_ost_idx = ost_idx;
280         llr->llr_lov_idx = lov_idx;
281
282         return llr;
283 }
284
285 static inline void lfsck_layout_req_fini(const struct lu_env *env,
286                                          struct lfsck_layout_req *llr)
287 {
288         lu_object_put(env, &llr->llr_child->do_lu);
289         lfsck_layout_object_put(env, llr->llr_parent);
290         OBD_FREE_PTR(llr);
291 }
292
293 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
294 {
295         bool empty = false;
296
297         spin_lock(&llmd->llmd_lock);
298         if (list_empty(&llmd->llmd_req_list))
299                 empty = true;
300         spin_unlock(&llmd->llmd_lock);
301
302         return empty;
303 }
304
305 static int lfsck_layout_get_lovea(const struct lu_env *env,
306                                   struct dt_object *obj,
307                                   struct lu_buf *buf, ssize_t *buflen)
308 {
309         int rc;
310
311 again:
312         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
313         if (rc == -ERANGE) {
314                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
315                                   BYPASS_CAPA);
316                 if (rc <= 0)
317                         return rc;
318
319                 lu_buf_realloc(buf, rc);
320                 if (buflen != NULL)
321                         *buflen = buf->lb_len;
322
323                 if (buf->lb_buf == NULL)
324                         return -ENOMEM;
325
326                 goto again;
327         }
328
329         if (rc == -ENODATA)
330                 rc = 0;
331
332         if (rc <= 0)
333                 return rc;
334
335         if (unlikely(buf->lb_buf == NULL)) {
336                 lu_buf_alloc(buf, rc);
337                 if (buflen != NULL)
338                         *buflen = buf->lb_len;
339
340                 if (buf->lb_buf == NULL)
341                         return -ENOMEM;
342
343                 goto again;
344         }
345
346         return rc;
347 }
348
349 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
350 {
351         __u32 magic;
352         __u32 patten;
353
354         magic = le32_to_cpu(lmm->lmm_magic);
355         /* If magic crashed, keep it there. Sometime later, during OST-object
356          * orphan handling, if some OST-object(s) back-point to it, it can be
357          * verified and repaired. */
358         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
359                 return -EINVAL;
360
361         patten = le32_to_cpu(lmm->lmm_pattern);
362         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
363         if (patten != LOV_PATTERN_RAID0)
364                 return -EOPNOTSUPP;
365
366         return 0;
367 }
368
369 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
370 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
371 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_SIZE - 1)
372
373 struct lfsck_rbtree_node {
374         struct rb_node   lrn_node;
375         __u64            lrn_seq;
376         __u32            lrn_first_oid;
377         atomic_t         lrn_known_count;
378         atomic_t         lrn_accessed_count;
379         void            *lrn_known_bitmap;
380         void            *lrn_accessed_bitmap;
381 };
382
383 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
384                                    __u64 seq, __u32 oid)
385 {
386         if (seq < lrn->lrn_seq)
387                 return -1;
388
389         if (seq > lrn->lrn_seq)
390                 return 1;
391
392         if (oid < lrn->lrn_first_oid)
393                 return -1;
394
395         if (oid >= lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH)
396                 return 1;
397
398         return 0;
399 }
400
401 /* The caller should hold llsd->llsd_rb_lock. */
402 static struct lfsck_rbtree_node *
403 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
404                     const struct lu_fid *fid, bool *exact)
405 {
406         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
407         struct rb_node           *prev  = NULL;
408         struct lfsck_rbtree_node *lrn   = NULL;
409         int                       rc    = 0;
410
411         if (exact != NULL)
412                 *exact = true;
413
414         while (node != NULL) {
415                 prev = node;
416                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
417                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
418                 if (rc < 0)
419                         node = node->rb_left;
420                 else if (rc > 0)
421                         node = node->rb_right;
422                 else
423                         return lrn;
424         }
425
426         if (exact == NULL)
427                 return NULL;
428
429         /* If there is no exactly matched one, then to the next valid one. */
430         *exact = false;
431
432         /* The rbtree is empty. */
433         if (rc == 0)
434                 return NULL;
435
436         if (rc < 0)
437                 return lrn;
438
439         node = rb_next(prev);
440
441         /* The end of the rbtree. */
442         if (node == NULL)
443                 return NULL;
444
445         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
446
447         return lrn;
448 }
449
450 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
451                                                   const struct lu_fid *fid)
452 {
453         struct lfsck_rbtree_node *lrn;
454
455         OBD_ALLOC_PTR(lrn);
456         if (lrn == NULL)
457                 return ERR_PTR(-ENOMEM);
458
459         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
460         if (lrn->lrn_known_bitmap == NULL) {
461                 OBD_FREE_PTR(lrn);
462
463                 return ERR_PTR(-ENOMEM);
464         }
465
466         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
467         if (lrn->lrn_accessed_bitmap == NULL) {
468                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
469                 OBD_FREE_PTR(lrn);
470
471                 return ERR_PTR(-ENOMEM);
472         }
473
474         rb_init_node(&lrn->lrn_node);
475         lrn->lrn_seq = fid_seq(fid);
476         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
477         atomic_set(&lrn->lrn_known_count, 0);
478         atomic_set(&lrn->lrn_accessed_count, 0);
479
480         return lrn;
481 }
482
483 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
484 {
485         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
486         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
487         OBD_FREE_PTR(lrn);
488 }
489
490 /* The caller should hold lock. */
491 static struct lfsck_rbtree_node *
492 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
493                     struct lfsck_rbtree_node *lrn)
494 {
495         struct rb_node           **pos    = &(llsd->llsd_rb_root.rb_node);
496         struct rb_node            *parent = NULL;
497         struct lfsck_rbtree_node  *tmp;
498         int                        rc;
499
500         while (*pos) {
501                 parent = *pos;
502                 tmp = rb_entry(*pos, struct lfsck_rbtree_node, lrn_node);
503                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
504                 if (rc < 0)
505                         pos = &((*pos)->rb_left);
506                 else if (rc > 0)
507                         pos = &((*pos)->rb_right);
508                 else
509                         return tmp;
510         }
511
512         rb_link_node(&lrn->lrn_node, parent, pos);
513         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
514
515         return lrn;
516 }
517
518 extern const struct dt_index_operations lfsck_orphan_index_ops;
519
520 static int lfsck_rbtree_setup(const struct lu_env *env,
521                               struct lfsck_component *com)
522 {
523         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
524         struct lfsck_instance           *lfsck  = com->lc_lfsck;
525         struct dt_device                *dev    = lfsck->li_bottom;
526         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
527         struct dt_object                *obj;
528
529         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
530         fid->f_oid = lfsck_dev_idx(dev);
531         fid->f_ver = 0;
532         obj = dt_locate(env, dev, fid);
533         if (IS_ERR(obj))
534                 RETURN(PTR_ERR(obj));
535
536         /* Generate an in-RAM object to stand for the layout rbtree.
537          * Scanning the layout rbtree will be via the iteration over
538          * the object. In the future, the rbtree may be written onto
539          * disk with the object.
540          *
541          * Mark the object to be as exist. */
542         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
543         obj->do_index_ops = &lfsck_orphan_index_ops;
544         llsd->llsd_rb_obj = obj;
545         llsd->llsd_rbtree_valid = 1;
546         dev->dd_record_fid_accessed = 1;
547
548         return 0;
549 }
550
551 static void lfsck_rbtree_cleanup(const struct lu_env *env,
552                                  struct lfsck_component *com)
553 {
554         struct lfsck_instance           *lfsck = com->lc_lfsck;
555         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
556         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
557         struct rb_node                  *next;
558         struct lfsck_rbtree_node        *lrn;
559
560         lfsck->li_bottom->dd_record_fid_accessed = 0;
561         /* Invalid the rbtree, then no others will use it. */
562         write_lock(&llsd->llsd_rb_lock);
563         llsd->llsd_rbtree_valid = 0;
564         write_unlock(&llsd->llsd_rb_lock);
565
566         while (node != NULL) {
567                 next = rb_next(node);
568                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
569                 rb_erase(node, &llsd->llsd_rb_root);
570                 lfsck_rbtree_free(lrn);
571                 node = next;
572         }
573
574         if (llsd->llsd_rb_obj != NULL) {
575                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
576                 llsd->llsd_rb_obj = NULL;
577         }
578 }
579
580 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
581                                        struct lfsck_component *com,
582                                        const struct lu_fid *fid,
583                                        bool accessed)
584 {
585         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
586         struct lfsck_rbtree_node        *lrn;
587         bool                             insert = false;
588         int                              idx;
589         int                              rc     = 0;
590         ENTRY;
591
592         CDEBUG(D_LFSCK, "%s: update bitmap for "DFID"\n",
593                lfsck_lfsck2name(com->lc_lfsck), PFID(fid));
594
595         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
596                 RETURN_EXIT;
597
598         if (!fid_is_idif(fid) && !fid_is_norm(fid))
599                 RETURN_EXIT;
600
601         read_lock(&llsd->llsd_rb_lock);
602         if (!llsd->llsd_rbtree_valid)
603                 GOTO(unlock, rc = 0);
604
605         lrn = lfsck_rbtree_search(llsd, fid, NULL);
606         if (lrn == NULL) {
607                 struct lfsck_rbtree_node *tmp;
608
609                 LASSERT(!insert);
610
611                 read_unlock(&llsd->llsd_rb_lock);
612                 tmp = lfsck_rbtree_new(env, fid);
613                 if (IS_ERR(tmp))
614                         GOTO(out, rc = PTR_ERR(tmp));
615
616                 insert = true;
617                 write_lock(&llsd->llsd_rb_lock);
618                 if (!llsd->llsd_rbtree_valid) {
619                         lfsck_rbtree_free(tmp);
620                         GOTO(unlock, rc = 0);
621                 }
622
623                 lrn = lfsck_rbtree_insert(llsd, tmp);
624                 if (lrn != tmp)
625                         lfsck_rbtree_free(tmp);
626         }
627
628         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
629         /* Any accessed object must be a known object. */
630         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
631                 atomic_inc(&lrn->lrn_known_count);
632         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
633                 atomic_inc(&lrn->lrn_accessed_count);
634
635         GOTO(unlock, rc = 0);
636
637 unlock:
638         if (insert)
639                 write_unlock(&llsd->llsd_rb_lock);
640         else
641                 read_unlock(&llsd->llsd_rb_lock);
642 out:
643         if (rc != 0 && accessed) {
644                 struct lfsck_layout *lo = com->lc_file_ram;
645
646                 CERROR("%s: Fail to update object accessed bitmap, will cause "
647                        "incorrect LFSCK OST-object handling, so disable it to "
648                        "cancel orphan handling for related device. rc = %d.\n",
649                        lfsck_lfsck2name(com->lc_lfsck), rc);
650                 lo->ll_flags |= LF_INCOMPLETE;
651                 lfsck_rbtree_cleanup(env, com);
652         }
653 }
654
655 static inline bool is_dummy_lov_ost_data(struct lov_ost_data_v1 *obj)
656 {
657         if (fid_is_zero(&obj->l_ost_oi.oi_fid) &&
658             obj->l_ost_gen == 0 && obj->l_ost_idx == 0)
659                 return true;
660
661         return false;
662 }
663
664 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
665                                    const struct lfsck_layout *src)
666 {
667         int i;
668
669         des->ll_magic = le32_to_cpu(src->ll_magic);
670         des->ll_status = le32_to_cpu(src->ll_status);
671         des->ll_flags = le32_to_cpu(src->ll_flags);
672         des->ll_success_count = le32_to_cpu(src->ll_success_count);
673         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
674         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
675         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
676         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
677         des->ll_time_last_checkpoint =
678                                 le64_to_cpu(src->ll_time_last_checkpoint);
679         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
680         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
681         des->ll_pos_first_inconsistent =
682                         le64_to_cpu(src->ll_pos_first_inconsistent);
683         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
684         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
685         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
686         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
687         for (i = 0; i < LLIT_MAX; i++)
688                 des->ll_objs_repaired[i] =
689                                 le64_to_cpu(src->ll_objs_repaired[i]);
690         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
691 }
692
693 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
694                                    const struct lfsck_layout *src)
695 {
696         int i;
697
698         des->ll_magic = cpu_to_le32(src->ll_magic);
699         des->ll_status = cpu_to_le32(src->ll_status);
700         des->ll_flags = cpu_to_le32(src->ll_flags);
701         des->ll_success_count = cpu_to_le32(src->ll_success_count);
702         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
703         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
704         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
705         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
706         des->ll_time_last_checkpoint =
707                                 cpu_to_le64(src->ll_time_last_checkpoint);
708         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
709         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
710         des->ll_pos_first_inconsistent =
711                         cpu_to_le64(src->ll_pos_first_inconsistent);
712         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
713         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
714         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
715         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
716         for (i = 0; i < LLIT_MAX; i++)
717                 des->ll_objs_repaired[i] =
718                                 cpu_to_le64(src->ll_objs_repaired[i]);
719         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
720 }
721
722 /**
723  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
724  * \retval 0: succeed.
725  * \retval -ve: failed cases.
726  */
727 static int lfsck_layout_load(const struct lu_env *env,
728                              struct lfsck_component *com)
729 {
730         struct lfsck_layout             *lo     = com->lc_file_ram;
731         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
732         ssize_t                          size   = com->lc_file_size;
733         loff_t                           pos    = 0;
734         int                              rc;
735
736         rc = dbo->dbo_read(env, com->lc_obj,
737                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
738                            BYPASS_CAPA);
739         if (rc == 0) {
740                 return -ENOENT;
741         } else if (rc < 0) {
742                 CWARN("%s: failed to load lfsck_layout: rc = %d\n",
743                       lfsck_lfsck2name(com->lc_lfsck), rc);
744                 return rc;
745         } else if (rc != size) {
746                 CWARN("%s: crashed lfsck_layout, to be reset: rc = %d\n",
747                       lfsck_lfsck2name(com->lc_lfsck), rc);
748                 return 1;
749         }
750
751         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
752         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
753                 CWARN("%s: invalid lfsck_layout magic %#x != %#x, "
754                       "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
755                       lo->ll_magic, LFSCK_LAYOUT_MAGIC);
756                 return 1;
757         }
758
759         return 0;
760 }
761
762 static int lfsck_layout_store(const struct lu_env *env,
763                               struct lfsck_component *com)
764 {
765         struct dt_object         *obj           = com->lc_obj;
766         struct lfsck_instance    *lfsck         = com->lc_lfsck;
767         struct lfsck_layout      *lo            = com->lc_file_disk;
768         struct thandle           *handle;
769         ssize_t                   size          = com->lc_file_size;
770         loff_t                    pos           = 0;
771         int                       rc;
772         ENTRY;
773
774         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
775         handle = dt_trans_create(env, lfsck->li_bottom);
776         if (IS_ERR(handle)) {
777                 rc = PTR_ERR(handle);
778                 CERROR("%s: fail to create trans for storing lfsck_layout: "
779                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
780                 RETURN(rc);
781         }
782
783         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
784                                      pos, handle);
785         if (rc != 0) {
786                 CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
787                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
788                 GOTO(out, rc);
789         }
790
791         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
792         if (rc != 0) {
793                 CERROR("%s: fail to start trans for storing lfsck_layout: "
794                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
795                 GOTO(out, rc);
796         }
797
798         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
799                              handle);
800         if (rc != 0)
801                 CERROR("%s: fail to store lfsck_layout(1): size = %d, "
802                        "rc = %d\n", lfsck_lfsck2name(lfsck), (int)size, rc);
803
804         GOTO(out, rc);
805
806 out:
807         dt_trans_stop(env, lfsck->li_bottom, handle);
808
809         return rc;
810 }
811
812 static int lfsck_layout_init(const struct lu_env *env,
813                              struct lfsck_component *com)
814 {
815         struct lfsck_layout *lo = com->lc_file_ram;
816         int rc;
817
818         memset(lo, 0, com->lc_file_size);
819         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
820         lo->ll_status = LS_INIT;
821         down_write(&com->lc_sem);
822         rc = lfsck_layout_store(env, com);
823         up_write(&com->lc_sem);
824
825         return rc;
826 }
827
828 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
829                              struct dt_object *obj, const struct lu_fid *fid)
830 {
831         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
832         struct lu_seq_range      range  = { 0 };
833         struct lustre_mdt_attrs *lma;
834         int                      rc;
835
836         fld_range_set_any(&range);
837         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
838         if (rc == 0) {
839                 if (fld_range_is_ost(&range))
840                         return 1;
841
842                 return 0;
843         }
844
845         lma = &lfsck_env_info(env)->lti_lma;
846         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
847                           XATTR_NAME_LMA, BYPASS_CAPA);
848         if (rc == sizeof(*lma)) {
849                 lustre_lma_swab(lma);
850
851                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
852         }
853
854         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
855
856         return rc > 0;
857 }
858
859 static struct lfsck_layout_seq *
860 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
861 {
862         struct lfsck_layout_seq *lls;
863
864         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
865                 if (lls->lls_seq == seq)
866                         return lls;
867
868                 if (lls->lls_seq > seq)
869                         return NULL;
870         }
871
872         return NULL;
873 }
874
875 static void
876 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
877                         struct lfsck_layout_seq *lls)
878 {
879         struct lfsck_layout_seq *tmp;
880         struct list_head        *pos = &llsd->llsd_seq_list;
881
882         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
883                 if (lls->lls_seq < tmp->lls_seq) {
884                         pos = &tmp->lls_list;
885                         break;
886                 }
887         }
888         list_add_tail(&lls->lls_list, pos);
889 }
890
891 static int
892 lfsck_layout_lastid_create(const struct lu_env *env,
893                            struct lfsck_instance *lfsck,
894                            struct dt_object *obj)
895 {
896         struct lfsck_thread_info *info   = lfsck_env_info(env);
897         struct lu_attr           *la     = &info->lti_la;
898         struct dt_object_format  *dof    = &info->lti_dof;
899         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
900         struct dt_device         *dt     = lfsck->li_bottom;
901         struct thandle           *th;
902         __u64                     lastid = 0;
903         loff_t                    pos    = 0;
904         int                       rc;
905         ENTRY;
906
907         CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
908                fid_seq(lfsck_dto2fid(obj)));
909
910         if (bk->lb_param & LPF_DRYRUN)
911                 return 0;
912
913         memset(la, 0, sizeof(*la));
914         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
915         la->la_valid = LA_MODE | LA_UID | LA_GID;
916         dof->dof_type = dt_mode_to_dft(S_IFREG);
917
918         th = dt_trans_create(env, dt);
919         if (IS_ERR(th))
920                 RETURN(rc = PTR_ERR(th));
921
922         rc = dt_declare_create(env, obj, la, NULL, dof, th);
923         if (rc != 0)
924                 GOTO(stop, rc);
925
926         rc = dt_declare_record_write(env, obj,
927                                      lfsck_buf_get(env, &lastid,
928                                                    sizeof(lastid)),
929                                      pos, th);
930         if (rc != 0)
931                 GOTO(stop, rc);
932
933         rc = dt_trans_start_local(env, dt, th);
934         if (rc != 0)
935                 GOTO(stop, rc);
936
937         dt_write_lock(env, obj, 0);
938         if (likely(!dt_object_exists(obj))) {
939                 rc = dt_create(env, obj, la, NULL, dof, th);
940                 if (rc == 0)
941                         rc = dt_record_write(env, obj,
942                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
943                                 &pos, th);
944         }
945         dt_write_unlock(env, obj);
946
947         GOTO(stop, rc);
948
949 stop:
950         dt_trans_stop(env, dt, th);
951
952         return rc;
953 }
954
955 static int
956 lfsck_layout_lastid_reload(const struct lu_env *env,
957                            struct lfsck_component *com,
958                            struct lfsck_layout_seq *lls)
959 {
960         __u64   lastid;
961         loff_t  pos     = 0;
962         int     rc;
963
964         dt_read_lock(env, lls->lls_lastid_obj, 0);
965         rc = dt_record_read(env, lls->lls_lastid_obj,
966                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
967         dt_read_unlock(env, lls->lls_lastid_obj);
968         if (unlikely(rc != 0))
969                 return rc;
970
971         lastid = le64_to_cpu(lastid);
972         if (lastid < lls->lls_lastid_known) {
973                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
974                 struct lfsck_layout     *lo     = com->lc_file_ram;
975
976                 lls->lls_lastid = lls->lls_lastid_known;
977                 lls->lls_dirty = 1;
978                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
979                         LASSERT(lfsck->li_out_notify != NULL);
980
981                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
982                                              LE_LASTID_REBUILDING);
983                         lo->ll_flags |= LF_CRASHED_LASTID;
984                 }
985         } else if (lastid >= lls->lls_lastid) {
986                 lls->lls_lastid = lastid;
987                 lls->lls_dirty = 0;
988         }
989
990         return 0;
991 }
992
993 static int
994 lfsck_layout_lastid_store(const struct lu_env *env,
995                           struct lfsck_component *com)
996 {
997         struct lfsck_instance           *lfsck  = com->lc_lfsck;
998         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
999         struct dt_device                *dt     = lfsck->li_bottom;
1000         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1001         struct lfsck_layout_seq         *lls;
1002         struct thandle                  *th;
1003         __u64                            lastid;
1004         int                              rc     = 0;
1005         int                              rc1    = 0;
1006
1007         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1008                 loff_t pos = 0;
1009
1010                 /* XXX: Add the code back if we really found related
1011                  *      inconsistent cases in the future. */
1012 #if 0
1013                 if (!lls->lls_dirty) {
1014                         /* In OFD, before the pre-creation, the LAST_ID
1015                          * file will be updated firstly, which may hide
1016                          * some potential crashed cases. For example:
1017                          *
1018                          * The old obj1's ID is higher than old LAST_ID
1019                          * but lower than the new LAST_ID, but the LFSCK
1020                          * have not touch the obj1 until the OFD updated
1021                          * the LAST_ID. So the LFSCK does not regard it
1022                          * as crashed case. But when OFD does not create
1023                          * successfully, it will set the LAST_ID as the
1024                          * real created objects' ID, then LFSCK needs to
1025                          * found related inconsistency. */
1026                         rc = lfsck_layout_lastid_reload(env, com, lls);
1027                         if (likely(!lls->lls_dirty))
1028                                 continue;
1029                 }
1030 #endif
1031
1032                 CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
1033                        " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
1034
1035                 if (bk->lb_param & LPF_DRYRUN) {
1036                         lls->lls_dirty = 0;
1037                         continue;
1038                 }
1039
1040                 th = dt_trans_create(env, dt);
1041                 if (IS_ERR(th)) {
1042                         rc1 = PTR_ERR(th);
1043                         CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
1044                                lfsck_lfsck2name(com->lc_lfsck),
1045                                lls->lls_seq, rc1);
1046                         continue;
1047                 }
1048
1049                 lastid = cpu_to_le64(lls->lls_lastid);
1050                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1051                                              lfsck_buf_get(env, &lastid,
1052                                                            sizeof(lastid)),
1053                                              pos, th);
1054                 if (rc != 0)
1055                         goto stop;
1056
1057                 rc = dt_trans_start_local(env, dt, th);
1058                 if (rc != 0)
1059                         goto stop;
1060
1061                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1062                 rc = dt_record_write(env, lls->lls_lastid_obj,
1063                                      lfsck_buf_get(env, &lastid,
1064                                      sizeof(lastid)), &pos, th);
1065                 dt_write_unlock(env, lls->lls_lastid_obj);
1066                 if (rc == 0)
1067                         lls->lls_dirty = 0;
1068
1069 stop:
1070                 dt_trans_stop(env, dt, th);
1071                 if (rc != 0) {
1072                         rc1 = rc;
1073                         CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
1074                                lfsck_lfsck2name(com->lc_lfsck),
1075                                lls->lls_seq, rc1);
1076                 }
1077         }
1078
1079         return rc1;
1080 }
1081
1082 static int
1083 lfsck_layout_lastid_load(const struct lu_env *env,
1084                          struct lfsck_component *com,
1085                          struct lfsck_layout_seq *lls)
1086 {
1087         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1088         struct lfsck_layout     *lo     = com->lc_file_ram;
1089         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1090         struct dt_object        *obj;
1091         loff_t                   pos    = 0;
1092         int                      rc;
1093         ENTRY;
1094
1095         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1096         obj = dt_locate(env, lfsck->li_bottom, fid);
1097         if (IS_ERR(obj))
1098                 RETURN(PTR_ERR(obj));
1099
1100         /* LAST_ID crashed, to be rebuilt */
1101         if (!dt_object_exists(obj)) {
1102                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1103                         LASSERT(lfsck->li_out_notify != NULL);
1104
1105                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1106                                              LE_LASTID_REBUILDING);
1107                         lo->ll_flags |= LF_CRASHED_LASTID;
1108
1109                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1110                             cfs_fail_val > 0) {
1111                                 struct l_wait_info lwi = LWI_TIMEOUT(
1112                                                 cfs_time_seconds(cfs_fail_val),
1113                                                 NULL, NULL);
1114
1115                                 up_write(&com->lc_sem);
1116                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1117                                              !thread_is_running(&lfsck->li_thread),
1118                                              &lwi);
1119                                 down_write(&com->lc_sem);
1120                         }
1121                 }
1122
1123                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1124         } else {
1125                 dt_read_lock(env, obj, 0);
1126                 rc = dt_read(env, obj,
1127                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1128                         &pos);
1129                 dt_read_unlock(env, obj);
1130                 if (rc != 0 && rc != sizeof(__u64))
1131                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1132
1133                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1134                         LASSERT(lfsck->li_out_notify != NULL);
1135
1136                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1137                                              LE_LASTID_REBUILDING);
1138                         lo->ll_flags |= LF_CRASHED_LASTID;
1139                 }
1140
1141                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1142                 rc = 0;
1143         }
1144
1145         GOTO(out, rc);
1146
1147 out:
1148         if (rc != 0)
1149                 lfsck_object_put(env, obj);
1150         else
1151                 lls->lls_lastid_obj = obj;
1152
1153         return rc;
1154 }
1155
1156 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1157                                                struct ptlrpc_request *req,
1158                                                void *args, int rc)
1159 {
1160         struct lfsck_async_interpret_args *laia = args;
1161         struct lfsck_component            *com  = laia->laia_com;
1162         struct lfsck_layout_master_data   *llmd = com->lc_data;
1163         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1164         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1165         struct lfsck_request              *lr   = laia->laia_lr;
1166
1167         switch (lr->lr_event) {
1168         case LE_START:
1169                 if (rc != 0) {
1170                         struct lfsck_layout *lo = com->lc_file_ram;
1171
1172                         CERROR("%s: fail to notify %s %x for layout start: "
1173                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1174                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1175                                ltd->ltd_index, rc);
1176                         lo->ll_flags |= LF_INCOMPLETE;
1177                         break;
1178                 }
1179
1180                 spin_lock(&ltds->ltd_lock);
1181                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1182                         spin_unlock(&ltds->ltd_lock);
1183                         break;
1184                 }
1185
1186                 if (lr->lr_flags & LEF_TO_OST) {
1187                         if (list_empty(&ltd->ltd_layout_list))
1188                                 list_add_tail(&ltd->ltd_layout_list,
1189                                               &llmd->llmd_ost_list);
1190                         if (list_empty(&ltd->ltd_layout_phase_list))
1191                                 list_add_tail(&ltd->ltd_layout_phase_list,
1192                                               &llmd->llmd_ost_phase1_list);
1193                 } else {
1194                         if (list_empty(&ltd->ltd_layout_list))
1195                                 list_add_tail(&ltd->ltd_layout_list,
1196                                               &llmd->llmd_mdt_list);
1197                         if (list_empty(&ltd->ltd_layout_phase_list))
1198                                 list_add_tail(&ltd->ltd_layout_phase_list,
1199                                               &llmd->llmd_mdt_phase1_list);
1200                 }
1201                 spin_unlock(&ltds->ltd_lock);
1202                 break;
1203         case LE_STOP:
1204         case LE_PHASE1_DONE:
1205         case LE_PHASE2_DONE:
1206         case LE_PEER_EXIT:
1207                 if (rc != 0 && rc != -EALREADY)
1208                         CWARN("%s: fail to notify %s %x for layout: "
1209                               "event = %d, rc = %d\n",
1210                               lfsck_lfsck2name(com->lc_lfsck),
1211                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1212                               ltd->ltd_index, lr->lr_event, rc);
1213                 break;
1214         case LE_QUERY: {
1215                 struct lfsck_reply *reply;
1216
1217                 if (rc != 0) {
1218                         spin_lock(&ltds->ltd_lock);
1219                         list_del_init(&ltd->ltd_layout_phase_list);
1220                         list_del_init(&ltd->ltd_layout_list);
1221                         spin_unlock(&ltds->ltd_lock);
1222                         break;
1223                 }
1224
1225                 reply = req_capsule_server_get(&req->rq_pill,
1226                                                &RMF_LFSCK_REPLY);
1227                 if (reply == NULL) {
1228                         rc = -EPROTO;
1229                         CERROR("%s: invalid return value: rc = %d\n",
1230                                lfsck_lfsck2name(com->lc_lfsck), rc);
1231                         spin_lock(&ltds->ltd_lock);
1232                         list_del_init(&ltd->ltd_layout_phase_list);
1233                         list_del_init(&ltd->ltd_layout_list);
1234                         spin_unlock(&ltds->ltd_lock);
1235                         break;
1236                 }
1237
1238                 switch (reply->lr_status) {
1239                 case LS_SCANNING_PHASE1:
1240                         break;
1241                 case LS_SCANNING_PHASE2:
1242                         spin_lock(&ltds->ltd_lock);
1243                         list_del_init(&ltd->ltd_layout_phase_list);
1244                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1245                                 spin_unlock(&ltds->ltd_lock);
1246                                 break;
1247                         }
1248
1249                         if (lr->lr_flags & LEF_TO_OST)
1250                                 list_add_tail(&ltd->ltd_layout_phase_list,
1251                                               &llmd->llmd_ost_phase2_list);
1252                         else
1253                                 list_add_tail(&ltd->ltd_layout_phase_list,
1254                                               &llmd->llmd_mdt_phase2_list);
1255                         spin_unlock(&ltds->ltd_lock);
1256                         break;
1257                 default:
1258                         spin_lock(&ltds->ltd_lock);
1259                         list_del_init(&ltd->ltd_layout_phase_list);
1260                         list_del_init(&ltd->ltd_layout_list);
1261                         spin_unlock(&ltds->ltd_lock);
1262                         break;
1263                 }
1264                 break;
1265         }
1266         default:
1267                 CERROR("%s: unexpected event: rc = %d\n",
1268                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1269                 break;
1270         }
1271
1272         if (!laia->laia_shared) {
1273                 lfsck_tgt_put(ltd);
1274                 lfsck_component_put(env, com);
1275         }
1276
1277         return 0;
1278 }
1279
1280 static int lfsck_layout_master_query_others(const struct lu_env *env,
1281                                             struct lfsck_component *com)
1282 {
1283         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1284         struct lfsck_request              *lr    = &info->lti_lr;
1285         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1286         struct lfsck_instance             *lfsck = com->lc_lfsck;
1287         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1288         struct ptlrpc_request_set         *set;
1289         struct lfsck_tgt_descs            *ltds;
1290         struct lfsck_tgt_desc             *ltd;
1291         struct list_head                  *head;
1292         int                                rc    = 0;
1293         int                                rc1   = 0;
1294         ENTRY;
1295
1296         set = ptlrpc_prep_set();
1297         if (set == NULL)
1298                 RETURN(-ENOMEM);
1299
1300         llmd->llmd_touch_gen++;
1301         memset(lr, 0, sizeof(*lr));
1302         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1303         lr->lr_event = LE_QUERY;
1304         lr->lr_active = LT_LAYOUT;
1305         laia->laia_com = com;
1306         laia->laia_lr = lr;
1307         laia->laia_shared = 0;
1308
1309         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1310                 ltds = &lfsck->li_mdt_descs;
1311                 lr->lr_flags = 0;
1312                 head = &llmd->llmd_mdt_phase1_list;
1313         } else {
1314
1315 again:
1316                 ltds = &lfsck->li_ost_descs;
1317                 lr->lr_flags = LEF_TO_OST;
1318                 head = &llmd->llmd_ost_phase1_list;
1319         }
1320
1321         laia->laia_ltds = ltds;
1322         spin_lock(&ltds->ltd_lock);
1323         while (!list_empty(head)) {
1324                 ltd = list_entry(head->next,
1325                                  struct lfsck_tgt_desc,
1326                                  ltd_layout_phase_list);
1327                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1328                         break;
1329
1330                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1331                 list_del(&ltd->ltd_layout_phase_list);
1332                 list_add_tail(&ltd->ltd_layout_phase_list, head);
1333                 atomic_inc(&ltd->ltd_ref);
1334                 laia->laia_ltd = ltd;
1335                 spin_unlock(&ltds->ltd_lock);
1336                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1337                                          lfsck_layout_master_async_interpret,
1338                                          laia, LFSCK_QUERY);
1339                 if (rc != 0) {
1340                         CERROR("%s: fail to query %s %x for layout: rc = %d\n",
1341                                lfsck_lfsck2name(lfsck),
1342                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1343                                ltd->ltd_index, rc);
1344                         lfsck_tgt_put(ltd);
1345                         rc1 = rc;
1346                 }
1347                 spin_lock(&ltds->ltd_lock);
1348         }
1349         spin_unlock(&ltds->ltd_lock);
1350
1351         rc = ptlrpc_set_wait(set);
1352         if (rc < 0) {
1353                 ptlrpc_set_destroy(set);
1354                 RETURN(rc);
1355         }
1356
1357         if (!(lr->lr_flags & LEF_TO_OST) &&
1358             list_empty(&llmd->llmd_mdt_phase1_list))
1359                 goto again;
1360
1361         ptlrpc_set_destroy(set);
1362
1363         RETURN(rc1 != 0 ? rc1 : rc);
1364 }
1365
1366 static inline bool
1367 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1368 {
1369         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1370                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1371                 list_empty(&llmd->llmd_ost_phase1_list));
1372 }
1373
1374 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1375                                              struct lfsck_component *com,
1376                                              struct lfsck_request *lr)
1377 {
1378         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1379         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1380         struct lfsck_instance             *lfsck = com->lc_lfsck;
1381         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1382         struct lfsck_layout               *lo    = com->lc_file_ram;
1383         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1384         struct ptlrpc_request_set         *set;
1385         struct lfsck_tgt_descs            *ltds;
1386         struct lfsck_tgt_desc             *ltd;
1387         struct lfsck_tgt_desc             *next;
1388         struct list_head                  *head;
1389         __u32                              idx;
1390         int                                rc    = 0;
1391         ENTRY;
1392
1393         set = ptlrpc_prep_set();
1394         if (set == NULL)
1395                 RETURN(-ENOMEM);
1396
1397         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1398         lr->lr_active = LT_LAYOUT;
1399         laia->laia_com = com;
1400         laia->laia_lr = lr;
1401         laia->laia_shared = 0;
1402         switch (lr->lr_event) {
1403         case LE_START:
1404                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1405                 ltds = &lfsck->li_ost_descs;
1406                 laia->laia_ltds = ltds;
1407                 down_read(&ltds->ltd_rw_sem);
1408                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1409                         ltd = lfsck_tgt_get(ltds, idx);
1410                         LASSERT(ltd != NULL);
1411
1412                         laia->laia_ltd = ltd;
1413                         ltd->ltd_layout_done = 0;
1414                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1415                                         lfsck_layout_master_async_interpret,
1416                                         laia, LFSCK_NOTIFY);
1417                         if (rc != 0) {
1418                                 CERROR("%s: fail to notify %s %x for layout "
1419                                        "start: rc = %d\n",
1420                                        lfsck_lfsck2name(lfsck),
1421                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1422                                        "MDT", idx, rc);
1423                                 lfsck_tgt_put(ltd);
1424                                 lo->ll_flags |= LF_INCOMPLETE;
1425                         }
1426                 }
1427                 up_read(&ltds->ltd_rw_sem);
1428
1429                 /* Sync up */
1430                 rc = ptlrpc_set_wait(set);
1431                 if (rc < 0) {
1432                         ptlrpc_set_destroy(set);
1433                         RETURN(rc);
1434                 }
1435
1436                 if (!(bk->lb_param & LPF_ALL_TGT))
1437                         break;
1438
1439                 /* link other MDT targets locallly. */
1440                 spin_lock(&ltds->ltd_lock);
1441                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1442                         ltd = LTD_TGT(ltds, idx);
1443                         LASSERT(ltd != NULL);
1444
1445                         if (!list_empty(&ltd->ltd_layout_list))
1446                                 continue;
1447
1448                         list_add_tail(&ltd->ltd_layout_list,
1449                                       &llmd->llmd_mdt_list);
1450                         list_add_tail(&ltd->ltd_layout_phase_list,
1451                                       &llmd->llmd_mdt_phase1_list);
1452                 }
1453                 spin_unlock(&ltds->ltd_lock);
1454                 break;
1455         case LE_STOP:
1456         case LE_PHASE2_DONE:
1457         case LE_PEER_EXIT: {
1458                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1459                 if (bk->lb_param & LPF_ALL_TGT) {
1460                         head = &llmd->llmd_mdt_list;
1461                         ltds = &lfsck->li_mdt_descs;
1462                         if (lr->lr_event == LE_STOP) {
1463                                 /* unlink other MDT targets locallly. */
1464                                 spin_lock(&ltds->ltd_lock);
1465                                 list_for_each_entry_safe(ltd, next, head,
1466                                                          ltd_layout_list) {
1467                                         list_del_init(&ltd->ltd_layout_phase_list);
1468                                         list_del_init(&ltd->ltd_layout_list);
1469                                 }
1470                                 spin_unlock(&ltds->ltd_lock);
1471
1472                                 lr->lr_flags |= LEF_TO_OST;
1473                                 head = &llmd->llmd_ost_list;
1474                                 ltds = &lfsck->li_ost_descs;
1475                         } else {
1476                                 lr->lr_flags &= ~LEF_TO_OST;
1477                         }
1478                 } else {
1479                         lr->lr_flags |= LEF_TO_OST;
1480                         head = &llmd->llmd_ost_list;
1481                         ltds = &lfsck->li_ost_descs;
1482                 }
1483
1484 again:
1485                 laia->laia_ltds = ltds;
1486                 spin_lock(&ltds->ltd_lock);
1487                 while (!list_empty(head)) {
1488                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1489                                          ltd_layout_list);
1490                         if (!list_empty(&ltd->ltd_layout_phase_list))
1491                                 list_del_init(&ltd->ltd_layout_phase_list);
1492                         list_del_init(&ltd->ltd_layout_list);
1493                         atomic_inc(&ltd->ltd_ref);
1494                         laia->laia_ltd = ltd;
1495                         spin_unlock(&ltds->ltd_lock);
1496                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1497                                         lfsck_layout_master_async_interpret,
1498                                         laia, LFSCK_NOTIFY);
1499                         if (rc != 0) {
1500                                 CERROR("%s: fail to notify %s %x for layout "
1501                                        "stop/phase2: rc = %d\n",
1502                                        lfsck_lfsck2name(lfsck),
1503                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1504                                        "MDT", ltd->ltd_index, rc);
1505                                 lfsck_tgt_put(ltd);
1506                         }
1507                         spin_lock(&ltds->ltd_lock);
1508                 }
1509                 spin_unlock(&ltds->ltd_lock);
1510
1511                 rc = ptlrpc_set_wait(set);
1512                 if (rc < 0) {
1513                         ptlrpc_set_destroy(set);
1514                         RETURN(rc);
1515                 }
1516
1517                 if (!(lr->lr_flags & LEF_TO_OST)) {
1518                         lr->lr_flags |= LEF_TO_OST;
1519                         head = &llmd->llmd_ost_list;
1520                         ltds = &lfsck->li_ost_descs;
1521                         goto again;
1522                 }
1523                 break;
1524         }
1525         case LE_PHASE1_DONE:
1526                 llmd->llmd_touch_gen++;
1527                 ltds = &lfsck->li_mdt_descs;
1528                 laia->laia_ltds = ltds;
1529                 spin_lock(&ltds->ltd_lock);
1530                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1531                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1532                                          struct lfsck_tgt_desc,
1533                                          ltd_layout_phase_list);
1534                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1535                                 break;
1536
1537                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1538                         list_del_init(&ltd->ltd_layout_phase_list);
1539                         list_add_tail(&ltd->ltd_layout_phase_list,
1540                                       &llmd->llmd_mdt_phase1_list);
1541                         atomic_inc(&ltd->ltd_ref);
1542                         laia->laia_ltd = ltd;
1543                         spin_unlock(&ltds->ltd_lock);
1544                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1545                                         lfsck_layout_master_async_interpret,
1546                                         laia, LFSCK_NOTIFY);
1547                         if (rc != 0) {
1548                                 CERROR("%s: fail to notify MDT %x for layout "
1549                                        "phase1 done: rc = %d\n",
1550                                        lfsck_lfsck2name(lfsck),
1551                                        ltd->ltd_index, rc);
1552                                 lfsck_tgt_put(ltd);
1553                         }
1554                         spin_lock(&ltds->ltd_lock);
1555                 }
1556                 spin_unlock(&ltds->ltd_lock);
1557                 break;
1558         default:
1559                 CERROR("%s: unexpected LFSCK event: rc = %d\n",
1560                        lfsck_lfsck2name(lfsck), lr->lr_event);
1561                 rc = -EINVAL;
1562                 break;
1563         }
1564
1565         rc = ptlrpc_set_wait(set);
1566         ptlrpc_set_destroy(set);
1567
1568         RETURN(rc);
1569 }
1570
1571 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1572                                            struct lfsck_component *com,
1573                                            int rc)
1574 {
1575         struct lfsck_instance   *lfsck = com->lc_lfsck;
1576         struct lfsck_layout     *lo    = com->lc_file_ram;
1577         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1578
1579         down_write(&com->lc_sem);
1580
1581         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1582                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1583         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1584         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1585
1586         if (rc > 0) {
1587                 com->lc_journal = 0;
1588                 if (lo->ll_flags & LF_INCOMPLETE)
1589                         lo->ll_status = LS_PARTIAL;
1590                 else
1591                         lo->ll_status = LS_COMPLETED;
1592                 if (!(bk->lb_param & LPF_DRYRUN))
1593                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1594                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1595                 lo->ll_success_count++;
1596         } else if (rc == 0) {
1597                 lo->ll_status = lfsck->li_status;
1598                 if (lo->ll_status == 0)
1599                         lo->ll_status = LS_STOPPED;
1600         } else {
1601                 lo->ll_status = LS_FAILED;
1602         }
1603
1604         if (lo->ll_status != LS_PAUSED) {
1605                 spin_lock(&lfsck->li_lock);
1606                 list_del_init(&com->lc_link);
1607                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1608                 spin_unlock(&lfsck->li_lock);
1609         }
1610
1611         rc = lfsck_layout_store(env, com);
1612
1613         up_write(&com->lc_sem);
1614
1615         return rc;
1616 }
1617
1618 static int lfsck_layout_lock(const struct lu_env *env,
1619                              struct lfsck_component *com,
1620                              struct dt_object *obj,
1621                              struct lustre_handle *lh, __u64 bits)
1622 {
1623         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1624         ldlm_policy_data_t              *policy = &info->lti_policy;
1625         struct ldlm_res_id              *resid  = &info->lti_resid;
1626         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1627         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1628         int                              rc;
1629
1630         LASSERT(lfsck->li_namespace != NULL);
1631
1632         memset(policy, 0, sizeof(*policy));
1633         policy->l_inodebits.bits = bits;
1634         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1635         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1636                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1637                                     ldlm_completion_ast, NULL, NULL, 0,
1638                                     LVB_T_NONE, NULL, lh);
1639         if (rc == ELDLM_OK) {
1640                 rc = 0;
1641         } else {
1642                 memset(lh, 0, sizeof(*lh));
1643                 rc = -EIO;
1644         }
1645
1646         return rc;
1647 }
1648
1649 static void lfsck_layout_unlock(struct lustre_handle *lh)
1650 {
1651         if (lustre_handle_is_used(lh)) {
1652                 ldlm_lock_decref(lh, LCK_EX);
1653                 memset(lh, 0, sizeof(*lh));
1654         }
1655 }
1656
1657 static int lfsck_layout_trans_stop(const struct lu_env *env,
1658                                    struct dt_device *dev,
1659                                    struct thandle *handle, int result)
1660 {
1661         int rc;
1662
1663         handle->th_result = result;
1664         rc = dt_trans_stop(env, dev, handle);
1665         if (rc > 0)
1666                 rc = 0;
1667         else if (rc == 0)
1668                 rc = 1;
1669
1670         return rc;
1671 }
1672
1673 /**
1674  * \retval       +1: repaired
1675  * \retval        0: did nothing
1676  * \retval      -ve: on error
1677  */
1678 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1679                                      struct thandle *handle,
1680                                      struct dt_object *parent,
1681                                      struct lu_fid *cfid,
1682                                      struct lu_buf *buf,
1683                                      struct lov_ost_data_v1 *slot,
1684                                      int fl, __u32 ost_idx)
1685 {
1686         struct ost_id   *oi     = &lfsck_env_info(env)->lti_oi;
1687         int              rc;
1688
1689         fid_to_ostid(cfid, oi);
1690         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1691         slot->l_ost_gen = cpu_to_le32(0);
1692         slot->l_ost_idx = cpu_to_le32(ost_idx);
1693         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle,
1694                           BYPASS_CAPA);
1695         if (rc == 0)
1696                 rc = 1;
1697
1698         return rc;
1699 }
1700
1701 /**
1702  * \retval       +1: repaired
1703  * \retval        0: did nothing
1704  * \retval      -ve: on error
1705  */
1706 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1707                                      struct thandle *handle,
1708                                      struct dt_object *parent,
1709                                      struct lu_fid *cfid,
1710                                      struct lu_buf *buf, int fl,
1711                                      __u32 ost_idx, __u32 ea_off)
1712 {
1713         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1714         struct lov_ost_data_v1  *objs;
1715         int                      rc;
1716         ENTRY;
1717
1718         if (fl == LU_XATTR_CREATE) {
1719                 LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1,
1720                                                        LOV_MAGIC_V1));
1721
1722                 memset(lmm, 0, buf->lb_len);
1723                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1724                 /* XXX: currently, we only support LOV_PATTERN_RAID0. */
1725                 lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
1726                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1727                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1728                 /* XXX: We cannot know the stripe size,
1729                  *      then use the default value (1 MB). */
1730                 lmm->lmm_stripe_size =
1731                         cpu_to_le32(LOV_DESC_STRIPE_SIZE_DEFAULT);
1732                 objs = &(lmm->lmm_objects[ea_off]);
1733         } else {
1734                 __u16   count = le16_to_cpu(lmm->lmm_stripe_count);
1735                 int     gap   = ea_off - count;
1736                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1737
1738                 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3
1739                  * which has been verified in lfsck_layout_verify_header()
1740                  * already. If some new magic introduced in the future,
1741                  * then layout LFSCK needs to be updated also. */
1742                 if (magic == LOV_MAGIC_V1) {
1743                         objs = &(lmm->lmm_objects[count]);
1744                 } else {
1745                         LASSERT(magic == LOV_MAGIC_V3);
1746                         objs = &((struct lov_mds_md_v3 *)lmm)->
1747                                                         lmm_objects[count];
1748                 }
1749
1750                 if (gap > 0)
1751                         memset(objs, 0, gap * sizeof(*objs));
1752                 lmm->lmm_layout_gen =
1753                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1754                 objs += gap;
1755
1756                 LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1, magic));
1757         }
1758
1759         lmm->lmm_stripe_count = cpu_to_le16(ea_off + 1);
1760         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1761                                        fl, ost_idx);
1762
1763         RETURN(rc);
1764 }
1765
1766 /**
1767  * \retval       +1: repaired
1768  * \retval        0: did nothing
1769  * \retval      -ve: on error
1770  */
1771 static int lfsck_layout_update_pfid(const struct lu_env *env,
1772                                     struct lfsck_component *com,
1773                                     struct dt_object *parent,
1774                                     struct lu_fid *cfid,
1775                                     struct dt_device *cdev, __u32 ea_off)
1776 {
1777         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1778         struct dt_object        *child;
1779         struct thandle          *handle;
1780         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1781         struct lu_buf           *buf;
1782         int                      rc     = 0;
1783         ENTRY;
1784
1785         child = lfsck_object_find_by_dev(env, cdev, cfid);
1786         if (IS_ERR(child))
1787                 RETURN(PTR_ERR(child));
1788
1789         handle = dt_trans_create(env, cdev);
1790         if (IS_ERR(handle))
1791                 GOTO(out, rc = PTR_ERR(handle));
1792
1793         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1794         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1795         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1796          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1797          * parent MDT-object's layout EA. */
1798         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1799         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1800
1801         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1802         if (rc != 0)
1803                 GOTO(stop, rc);
1804
1805         rc = dt_trans_start(env, cdev, handle);
1806         if (rc != 0)
1807                 GOTO(stop, rc);
1808
1809         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1810                           BYPASS_CAPA);
1811
1812         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1813
1814 stop:
1815         dt_trans_stop(env, cdev, handle);
1816
1817 out:
1818         lu_object_put(env, &child->do_lu);
1819
1820         return rc;
1821 }
1822
1823 /**
1824  * \retval       +1: repaired
1825  * \retval        0: did nothing
1826  * \retval      -ve: on error
1827  */
1828 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1829                                         struct lfsck_component *com,
1830                                         struct lfsck_tgt_desc *ltd,
1831                                         struct lu_orphan_rec *rec,
1832                                         struct lu_fid *cfid,
1833                                         const char *prefix,
1834                                         const char *postfix,
1835                                         __u32 ea_off)
1836 {
1837         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1838         char                            *name   = info->lti_key;
1839         struct lu_attr                  *la     = &info->lti_la;
1840         struct dt_object_format         *dof    = &info->lti_dof;
1841         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1842         struct lu_fid                   *pfid   = &rec->lor_fid;
1843         struct lu_fid                   *tfid   = &info->lti_fid3;
1844         struct dt_device                *next   = lfsck->li_next;
1845         struct dt_object                *pobj   = NULL;
1846         struct dt_object                *cobj   = NULL;
1847         struct thandle                  *th     = NULL;
1848         struct lu_buf                   *pbuf   = NULL;
1849         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1850         struct lustre_handle             lh     = { 0 };
1851         int                              buflen = ea_buf->lb_len;
1852         int                              idx    = 0;
1853         int                              rc     = 0;
1854         ENTRY;
1855
1856         /* Create .lustre/lost+found/MDTxxxx when needed. */
1857         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1858                 rc = lfsck_create_lpf(env, lfsck);
1859                 if (rc != 0)
1860                         RETURN(rc);
1861         }
1862
1863         if (fid_is_zero(pfid)) {
1864                 struct filter_fid *ff = &info->lti_new_pfid;
1865
1866                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
1867                 if (rc != 0)
1868                         RETURN(rc);
1869
1870                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
1871                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
1872                 /* Currently, the filter_fid::ff_parent::f_ver is not the
1873                  * real parent MDT-object's FID::f_ver, instead it is the
1874                  * OST-object index in its parent MDT-object's layout EA. */
1875                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1876                 pbuf = lfsck_buf_get(env, ff, sizeof(struct filter_fid));
1877                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
1878                 if (IS_ERR(cobj))
1879                         RETURN(PTR_ERR(cobj));
1880         }
1881
1882         CDEBUG(D_LFSCK, "Re-create the lost MDT-object: parent "
1883                DFID", child "DFID", OST-index %u, stripe-index %u, "
1884                "prefix %s, postfix %s\n",
1885                PFID(pfid), PFID(cfid), ltd->ltd_index, ea_off, prefix, postfix);
1886
1887         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
1888         if (IS_ERR(pobj))
1889                 GOTO(put, rc = PTR_ERR(pobj));
1890
1891         LASSERT(prefix != NULL);
1892         LASSERT(postfix != NULL);
1893
1894         /** name rules:
1895          *
1896          *  1. Use the MDT-object's FID as the name with prefix and postfix.
1897          *
1898          *  1.1 prefix "C-":    More than one OST-objects claim the same
1899          *                      MDT-object and the same slot in the layout EA.
1900          *                      It may be created for dangling referenced MDT
1901          *                      object or may be not.
1902          *  1.2 prefix "N-":    The orphan OST-object does not know which one
1903          *                      is the real parent, so the LFSCK assign a new
1904          *                      FID as its parent.
1905          *  1.3 prefix "R-":    The orphan OST-object know its parent FID but
1906          *                      does not know the position in the namespace.
1907          *
1908          *  2. If there is name conflict, append more index for new name. */
1909         sprintf(name, "%s"DFID"%s", prefix, PFID(pfid), postfix);
1910         do {
1911                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
1912                                (const struct dt_key *)name, BYPASS_CAPA);
1913                 if (rc != 0 && rc != -ENOENT)
1914                         GOTO(put, rc);
1915
1916                 if (unlikely(rc == 0)) {
1917                         CWARN("%s: The name %s under lost+found has been used "
1918                               "by the "DFID". Try to increase the FID version "
1919                               "for the new file name.\n",
1920                               lfsck_lfsck2name(lfsck), name, PFID(tfid));
1921                         sprintf(name, "%s"DFID"%s-%d", prefix, PFID(pfid),
1922                                 postfix, ++idx);
1923                 }
1924         } while (rc == 0);
1925
1926         memset(la, 0, sizeof(*la));
1927         la->la_uid = rec->lor_uid;
1928         la->la_gid = rec->lor_gid;
1929         la->la_mode = S_IFREG | S_IRUSR | S_IWUSR;
1930         la->la_valid = LA_MODE | LA_UID | LA_GID;
1931
1932         memset(dof, 0, sizeof(*dof));
1933         dof->dof_type = dt_mode_to_dft(S_IFREG);
1934
1935         rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
1936         if (buflen < rc) {
1937                 lu_buf_realloc(ea_buf, rc);
1938                 buflen = ea_buf->lb_len;
1939                 if (ea_buf->lb_buf == NULL)
1940                         GOTO(put, rc = -ENOMEM);
1941         } else {
1942                 ea_buf->lb_len = rc;
1943         }
1944
1945         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
1946          *
1947          * XXX: Currently, we do not grab the PDO lock as normal create cases,
1948          *      because creating MDT-object for orphan OST-object is rare, we
1949          *      do not much care about the performance. It can be improved in
1950          *      the future when needed. */
1951         rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
1952                                MDS_INODELOCK_UPDATE);
1953         if (rc != 0)
1954                 GOTO(put, rc);
1955
1956         th = dt_trans_create(env, next);
1957         if (IS_ERR(th))
1958                 GOTO(unlock, rc = PTR_ERR(th));
1959
1960         /* 1a. Update OST-object's parent information remotely.
1961          *
1962          * If other subsequent modifications failed, then next LFSCK scanning
1963          * will process the OST-object as orphan again with known parent FID. */
1964         if (cobj != NULL) {
1965                 rc = dt_declare_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th);
1966                 if (rc != 0)
1967                         GOTO(stop, rc);
1968         }
1969
1970         /* 2a. Create the MDT-object locally. */
1971         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
1972         if (rc != 0)
1973                 GOTO(stop, rc);
1974
1975         /* 3a. Add layout EA for the MDT-object. */
1976         rc = dt_declare_xattr_set(env, pobj, ea_buf, XATTR_NAME_LOV,
1977                                   LU_XATTR_CREATE, th);
1978         if (rc != 0)
1979                 GOTO(stop, rc);
1980
1981         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1982         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
1983                                (const struct dt_rec *)pfid,
1984                                (const struct dt_key *)name, th);
1985         if (rc != 0)
1986                 GOTO(stop, rc);
1987
1988         rc = dt_trans_start(env, next, th);
1989         if (rc != 0)
1990                 GOTO(stop, rc);
1991
1992         /* 1b. Update OST-object's parent information remotely. */
1993         if (cobj != NULL) {
1994                 rc = dt_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th,
1995                                   BYPASS_CAPA);
1996                 if (rc != 0)
1997                         GOTO(stop, rc);
1998         }
1999
2000         dt_write_lock(env, pobj, 0);
2001         /* 2b. Create the MDT-object locally. */
2002         rc = dt_create(env, pobj, la, NULL, dof, th);
2003         if (rc == 0)
2004                 /* 3b. Add layout EA for the MDT-object. */
2005                 rc = lfsck_layout_extend_lovea(env, th, pobj, cfid, ea_buf,
2006                                                LU_XATTR_CREATE, ltd->ltd_index,
2007                                                ea_off);
2008         dt_write_unlock(env, pobj);
2009         if (rc < 0)
2010                 GOTO(stop, rc);
2011
2012         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2013         rc = dt_insert(env, lfsck->li_lpf_obj,
2014                        (const struct dt_rec *)pfid,
2015                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2016
2017         GOTO(stop, rc);
2018
2019 stop:
2020         dt_trans_stop(env, next, th);
2021
2022 unlock:
2023         lfsck_layout_unlock(&lh);
2024
2025 put:
2026         if (cobj != NULL && !IS_ERR(cobj))
2027                 lu_object_put(env, &cobj->do_lu);
2028         if (pobj != NULL && !IS_ERR(pobj))
2029                 lu_object_put(env, &pobj->do_lu);
2030         ea_buf->lb_len = buflen;
2031
2032         return rc >= 0 ? 1 : rc;
2033 }
2034
2035 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2036                                                    struct lfsck_component *com,
2037                                                    const struct lu_fid *fid,
2038                                                    __u32 index)
2039 {
2040         struct lfsck_thread_info *info  = lfsck_env_info(env);
2041         struct lfsck_request     *lr    = &info->lti_lr;
2042         struct lfsck_instance    *lfsck = com->lc_lfsck;
2043         struct lfsck_tgt_desc    *ltd;
2044         struct ptlrpc_request    *req;
2045         struct lfsck_request     *tmp;
2046         struct obd_export        *exp;
2047         int                       rc    = 0;
2048         ENTRY;
2049
2050         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2051         if (unlikely(ltd == NULL))
2052                 RETURN(-ENODEV);
2053
2054         exp = ltd->ltd_exp;
2055         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2056                 GOTO(put, rc = -EOPNOTSUPP);
2057
2058         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2059         if (req == NULL)
2060                 GOTO(put, rc = -ENOMEM);
2061
2062         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2063         if (rc != 0) {
2064                 ptlrpc_request_free(req);
2065
2066                 GOTO(put, rc);
2067         }
2068
2069         memset(lr, 0, sizeof(*lr));
2070         lr->lr_event = LE_CONDITIONAL_DESTROY;
2071         lr->lr_active = LT_LAYOUT;
2072         lr->lr_fid = *fid;
2073
2074         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2075         *tmp = *lr;
2076         ptlrpc_request_set_replen(req);
2077
2078         rc = ptlrpc_queue_wait(req);
2079         ptlrpc_req_finished(req);
2080
2081         GOTO(put, rc);
2082
2083 put:
2084         lfsck_tgt_put(ltd);
2085
2086         return rc;
2087 }
2088
2089 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2090                                                   struct lfsck_component *com,
2091                                                   struct lfsck_request *lr)
2092 {
2093         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2094         struct lu_attr                  *la     = &info->lti_la;
2095         ldlm_policy_data_t              *policy = &info->lti_policy;
2096         struct ldlm_res_id              *resid  = &info->lti_resid;
2097         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2098         struct dt_device                *dev    = lfsck->li_bottom;
2099         struct lu_fid                   *fid    = &lr->lr_fid;
2100         struct dt_object                *obj;
2101         struct thandle                  *th     = NULL;
2102         struct lustre_handle             lh     = { 0 };
2103         __u64                            flags  = 0;
2104         int                              rc     = 0;
2105         ENTRY;
2106
2107         obj = lfsck_object_find_by_dev(env, dev, fid);
2108         if (IS_ERR(obj))
2109                 RETURN(PTR_ERR(obj));
2110
2111         dt_read_lock(env, obj, 0);
2112         if (dt_object_exists(obj) == 0) {
2113                 dt_read_unlock(env, obj);
2114
2115                 GOTO(put, rc = -ENOENT);
2116         }
2117
2118         /* Get obj's attr without lock firstly. */
2119         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2120         dt_read_unlock(env, obj);
2121         if (rc != 0)
2122                 GOTO(put, rc);
2123
2124         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2125                 GOTO(put, rc = -ETXTBSY);
2126
2127         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2128         LASSERT(lfsck->li_namespace != NULL);
2129
2130         memset(policy, 0, sizeof(*policy));
2131         policy->l_extent.end = OBD_OBJECT_EOF;
2132         ost_fid_build_resid(fid, resid);
2133         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2134                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2135                                     ldlm_completion_ast, NULL, NULL, 0,
2136                                     LVB_T_NONE, NULL, &lh);
2137         if (rc != ELDLM_OK)
2138                 GOTO(put, rc = -EIO);
2139
2140         dt_write_lock(env, obj, 0);
2141         /* Get obj's attr within lock again. */
2142         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2143         if (rc != 0)
2144                 GOTO(unlock, rc);
2145
2146         if (la->la_ctime != 0)
2147                 GOTO(unlock, rc = -ETXTBSY);
2148
2149         th = dt_trans_create(env, dev);
2150         if (IS_ERR(th))
2151                 GOTO(unlock, rc = PTR_ERR(th));
2152
2153         rc = dt_declare_ref_del(env, obj, th);
2154         if (rc != 0)
2155                 GOTO(stop, rc);
2156
2157         rc = dt_declare_destroy(env, obj, th);
2158         if (rc != 0)
2159                 GOTO(stop, rc);
2160
2161         rc = dt_trans_start_local(env, dev, th);
2162         if (rc != 0)
2163                 GOTO(stop, rc);
2164
2165         rc = dt_ref_del(env, obj, th);
2166         if (rc != 0)
2167                 GOTO(stop, rc);
2168
2169         rc = dt_destroy(env, obj, th);
2170         if (rc == 0)
2171                 CDEBUG(D_LFSCK, "Destroy the empty OST-object "DFID" which "
2172                        "was created for reparing dangling referenced case. "
2173                        "But the original missed OST-object is found now.\n",
2174                        PFID(fid));
2175
2176         GOTO(stop, rc);
2177
2178 stop:
2179         dt_trans_stop(env, dev, th);
2180
2181 unlock:
2182         dt_write_unlock(env, obj);
2183         ldlm_lock_decref(&lh, LCK_EX);
2184
2185 put:
2186         lu_object_put(env, &obj->do_lu);
2187
2188         return rc;
2189 }
2190
2191 /**
2192  * Some OST-object has occupied the specified layout EA slot.
2193  * Such OST-object may be generated by the LFSCK when repair
2194  * dangling referenced MDT-object, which can be indicated by
2195  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2196  * is true and such OST-object has not been modified yet, we
2197  * will replace it with the orphan OST-object; otherwise the
2198  * LFSCK will create new MDT-object to reference the orphan.
2199  *
2200  * \retval       +1: repaired
2201  * \retval        0: did nothing
2202  * \retval      -ve: on error
2203  */
2204 static int lfsck_layout_conflict_create(const struct lu_env *env,
2205                                         struct lfsck_component *com,
2206                                         struct lfsck_tgt_desc *ltd,
2207                                         struct lu_orphan_rec *rec,
2208                                         struct dt_object *parent,
2209                                         struct lu_fid *cfid,
2210                                         struct lu_buf *ea_buf,
2211                                         struct lov_ost_data_v1 *slot,
2212                                         __u32 ea_off, __u32 ori_len)
2213 {
2214         struct lfsck_thread_info *info          = lfsck_env_info(env);
2215         struct lu_fid            *cfid2         = &info->lti_fid2;
2216         struct ost_id            *oi            = &info->lti_oi;
2217         char                     *postfix       = info->lti_tmpbuf;
2218         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2219         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2220         struct thandle           *th            = NULL;
2221         struct lustre_handle      lh            = { 0 };
2222         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2223         int                       rc            = 0;
2224         ENTRY;
2225
2226         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2227         ostid_to_fid(cfid2, oi, ost_idx2);
2228
2229         CDEBUG(D_LFSCK, "Handle layout EA conflict: parent "DFID
2230                ", cur-child "DFID" on the OST %u, orphan-child "
2231                DFID" on the OST %u, stripe-index %u\n",
2232                PFID(lfsck_dto2fid(parent)), PFID(cfid2), ost_idx2,
2233                PFID(cfid), ltd->ltd_index, ea_off);
2234
2235         /* Hold layout lock on the parent to prevent others to access. */
2236         rc = lfsck_layout_lock(env, com, parent, &lh,
2237                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2238         if (rc != 0)
2239                 GOTO(out, rc);
2240
2241         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2242
2243         /* If the conflict OST-obejct is not created for fixing dangling
2244          * referenced MDT-object in former LFSCK check/repair, or it has
2245          * been modified by others, then we cannot destroy it. Re-create
2246          * a new MDT-object for the orphan OST-object. */
2247         if (rc == -ETXTBSY) {
2248                 /* No need the layout lock on the original parent. */
2249                 lfsck_layout_unlock(&lh);
2250                 ea_buf->lb_len = ori_len;
2251
2252                 fid_zero(&rec->lor_fid);
2253                 snprintf(postfix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2254                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2255                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2256                                                   "C-", postfix, ea_off);
2257
2258                 RETURN(rc);
2259         }
2260
2261         if (rc != 0 && rc != -ENOENT)
2262                 GOTO(unlock, rc);
2263
2264         th = dt_trans_create(env, dev);
2265         if (IS_ERR(th))
2266                 GOTO(unlock, rc = PTR_ERR(th));
2267
2268         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2269                                   LU_XATTR_REPLACE, th);
2270         if (rc != 0)
2271                 GOTO(stop, rc);
2272
2273         rc = dt_trans_start_local(env, dev, th);
2274         if (rc != 0)
2275                 GOTO(stop, rc);
2276
2277         dt_write_lock(env, parent, 0);
2278         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2279         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2280                                        LU_XATTR_REPLACE, ltd->ltd_index);
2281         dt_write_unlock(env, parent);
2282
2283         GOTO(stop, rc);
2284
2285 stop:
2286         dt_trans_stop(env, dev, th);
2287
2288 unlock:
2289         lfsck_layout_unlock(&lh);
2290
2291 out:
2292         ea_buf->lb_len = ori_len;
2293
2294         return rc >= 0 ? 1 : rc;
2295 }
2296
2297 /**
2298  * \retval       +1: repaired
2299  * \retval        0: did nothing
2300  * \retval      -ve: on error
2301  */
2302 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2303                                        struct lfsck_component *com,
2304                                        struct lfsck_tgt_desc *ltd,
2305                                        struct lu_orphan_rec *rec,
2306                                        struct dt_object *parent,
2307                                        struct lu_fid *cfid,
2308                                        __u32 ost_idx, __u32 ea_off)
2309 {
2310         struct lfsck_thread_info *info          = lfsck_env_info(env);
2311         struct lu_buf            *buf           = &info->lti_big_buf;
2312         struct lu_fid            *fid           = &info->lti_fid2;
2313         struct ost_id            *oi            = &info->lti_oi;
2314         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2315         struct dt_device         *dt            = lfsck->li_bottom;
2316         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2317         struct thandle            *handle       = NULL;
2318         size_t                    buflen        = buf->lb_len;
2319         struct lov_mds_md_v1     *lmm;
2320         struct lov_ost_data_v1   *objs;
2321         struct lustre_handle      lh            = { 0 };
2322         __u32                     magic;
2323         int                       fl            = 0;
2324         int                       rc            = 0;
2325         int                       rc1;
2326         int                       i;
2327         __u16                     count;
2328         bool                      locked        = false;
2329         ENTRY;
2330
2331         CDEBUG(D_LFSCK, "Re-create the crashed layout EA: parent "
2332                DFID", child "DFID", OST-index %u, stripe-index %u\n",
2333                PFID(lfsck_dto2fid(parent)), PFID(cfid), ost_idx, ea_off);
2334
2335         rc = lfsck_layout_lock(env, com, parent, &lh,
2336                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2337         if (rc != 0)
2338                 RETURN(rc);
2339
2340 again:
2341         if (locked) {
2342                 dt_write_unlock(env, parent);
2343                 locked = false;
2344         }
2345
2346         if (handle != NULL) {
2347                 dt_trans_stop(env, dt, handle);
2348                 handle = NULL;
2349         }
2350
2351         if (rc < 0)
2352                 GOTO(unlock_layout, rc);
2353
2354         if (buf->lb_len < rc) {
2355                 lu_buf_realloc(buf, rc);
2356                 buflen = buf->lb_len;
2357                 if (buf->lb_buf == NULL)
2358                         GOTO(unlock_layout, rc = -ENOMEM);
2359         }
2360
2361         if (!(bk->lb_param & LPF_DRYRUN)) {
2362                 handle = dt_trans_create(env, dt);
2363                 if (IS_ERR(handle))
2364                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2365
2366                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2367                                           fl, handle);
2368                 if (rc != 0)
2369                         GOTO(stop, rc);
2370
2371                 rc = dt_trans_start_local(env, dt, handle);
2372                 if (rc != 0)
2373                         GOTO(stop, rc);
2374         }
2375
2376         dt_write_lock(env, parent, 0);
2377         locked = true;
2378         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2379         if (rc == -ERANGE) {
2380                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2381                                   BYPASS_CAPA);
2382                 LASSERT(rc != 0);
2383                 goto again;
2384         } else if (rc == -ENODATA || rc == 0) {
2385                 rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2386                 /* If the declared is not big enough, re-try. */
2387                 if (buf->lb_len < rc)
2388                         goto again;
2389
2390                 fl = LU_XATTR_CREATE;
2391         } else if (rc < 0) {
2392                 GOTO(unlock_parent, rc);
2393         } else if (unlikely(buf->lb_len == 0)) {
2394                 goto again;
2395         } else {
2396                 fl = LU_XATTR_REPLACE;
2397         }
2398
2399         if (fl == LU_XATTR_CREATE) {
2400                 if (bk->lb_param & LPF_DRYRUN)
2401                         GOTO(unlock_parent, rc = 1);
2402
2403                 LASSERT(buf->lb_len >= rc);
2404
2405                 buf->lb_len = rc;
2406                 rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
2407                                                fl, ost_idx, ea_off);
2408
2409                 GOTO(unlock_parent, rc);
2410         }
2411
2412         lmm = buf->lb_buf;
2413         rc1 = lfsck_layout_verify_header(lmm);
2414         if (rc1 != 0)
2415                 GOTO(unlock_parent, rc = rc1);
2416
2417         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2418          * been verified in lfsck_layout_verify_header() already. If some
2419          * new magic introduced in the future, then layout LFSCK needs to
2420          * be updated also. */
2421         magic = le32_to_cpu(lmm->lmm_magic);
2422         if (magic == LOV_MAGIC_V1) {
2423                 objs = &(lmm->lmm_objects[0]);
2424         } else {
2425                 LASSERT(magic == LOV_MAGIC_V3);
2426                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2427         }
2428
2429         count = le16_to_cpu(lmm->lmm_stripe_count);
2430         if (count == 0)
2431                 GOTO(unlock_parent, rc = -EINVAL);
2432         LASSERT(count > 0);
2433
2434         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2435         if (count <= ea_off) {
2436                 if (bk->lb_param & LPF_DRYRUN)
2437                         GOTO(unlock_parent, rc = 1);
2438
2439                 rc = lov_mds_md_size(ea_off + 1, magic);
2440                 /* If the declared is not big enough, re-try. */
2441                 if (buf->lb_len < rc)
2442                         goto again;
2443
2444                 buf->lb_len = rc;
2445                 rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
2446                                                fl, ost_idx, ea_off);
2447                 GOTO(unlock_parent, rc);
2448         }
2449
2450         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2451
2452         buf->lb_len = rc;
2453         for (i = 0; i < count; i++, objs++) {
2454                 /* The MDT-object was created via lfsck_layout_recover_create()
2455                  * by others before, and we fill the dummy layout EA. */
2456                 if (is_dummy_lov_ost_data(objs)) {
2457                         if (i != ea_off)
2458                                 continue;
2459
2460                         if (bk->lb_param & LPF_DRYRUN)
2461                                 GOTO(unlock_parent, rc = 1);
2462
2463                         lmm->lmm_layout_gen =
2464                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2465                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2466                                                        cfid, buf, objs, fl,
2467                                                        ost_idx);
2468                         GOTO(unlock_parent, rc);
2469                 }
2470
2471                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2472                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2473                 /* It should be rare case, the slot is there, but the LFSCK
2474                  * does not handle it during the first-phase cycle scanning. */
2475                 if (unlikely(lu_fid_eq(fid, cfid))) {
2476                         if (i == ea_off) {
2477                                 GOTO(unlock_parent, rc = 0);
2478                         } else {
2479                                 /* Rare case that the OST-object index
2480                                  * does not match the parent MDT-object
2481                                  * layout EA. We trust the later one. */
2482                                 if (bk->lb_param & LPF_DRYRUN)
2483                                         GOTO(unlock_parent, rc = 1);
2484
2485                                 dt_write_unlock(env, parent);
2486                                 if (handle != NULL)
2487                                         dt_trans_stop(env, dt, handle);
2488                                 lfsck_layout_unlock(&lh);
2489                                 buf->lb_len = buflen;
2490                                 rc = lfsck_layout_update_pfid(env, com, parent,
2491                                                         cfid, ltd->ltd_tgt, i);
2492
2493                                 RETURN(rc);
2494                         }
2495                 }
2496         }
2497
2498         /* The MDT-object exists, but related layout EA slot is occupied
2499          * by others. */
2500         if (bk->lb_param & LPF_DRYRUN)
2501                 GOTO(unlock_parent, rc = 1);
2502
2503         dt_write_unlock(env, parent);
2504         if (handle != NULL)
2505                 dt_trans_stop(env, dt, handle);
2506         lfsck_layout_unlock(&lh);
2507         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2508                 objs = &(lmm->lmm_objects[ea_off]);
2509         else
2510                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2511         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2512                                           buf, objs, ea_off, buflen);
2513
2514         RETURN(rc);
2515
2516 unlock_parent:
2517         if (locked)
2518                 dt_write_unlock(env, parent);
2519
2520 stop:
2521         if (handle != NULL)
2522                 dt_trans_stop(env, dt, handle);
2523
2524 unlock_layout:
2525         lfsck_layout_unlock(&lh);
2526         buf->lb_len = buflen;
2527
2528         return rc;
2529 }
2530
2531 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2532                                         struct lfsck_component *com,
2533                                         struct lfsck_tgt_desc *ltd,
2534                                         struct lu_orphan_rec *rec,
2535                                         struct lu_fid *cfid)
2536 {
2537         struct lfsck_layout     *lo     = com->lc_file_ram;
2538         struct lu_fid           *pfid   = &rec->lor_fid;
2539         struct dt_object        *parent = NULL;
2540         __u32                    ea_off = pfid->f_stripe_idx;
2541         int                      rc     = 0;
2542         ENTRY;
2543
2544         if (!fid_is_sane(cfid))
2545                 GOTO(out, rc = -EINVAL);
2546
2547         if (fid_is_zero(pfid)) {
2548                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2549                                                   "N-", "", ea_off);
2550                 GOTO(out, rc);
2551         }
2552
2553         pfid->f_ver = 0;
2554         if (!fid_is_sane(pfid))
2555                 GOTO(out, rc = -EINVAL);
2556
2557         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2558         if (IS_ERR(parent))
2559                 GOTO(out, rc = PTR_ERR(parent));
2560
2561         if (unlikely(dt_object_remote(parent) != 0))
2562                 GOTO(put, rc = -EXDEV);
2563
2564         if (dt_object_exists(parent) == 0) {
2565                 lu_object_put(env, &parent->do_lu);
2566                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2567                                                   "R-", "", ea_off);
2568                 GOTO(out, rc);
2569         }
2570
2571         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2572                 GOTO(put, rc = -EISDIR);
2573
2574         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2575                                          ltd->ltd_index, ea_off);
2576
2577         GOTO(put, rc);
2578
2579 put:
2580         if (rc <= 0)
2581                 lu_object_put(env, &parent->do_lu);
2582         else
2583                 /* The layout EA is changed, need to be reloaded next time. */
2584                 lu_object_put_nocache(env, &parent->do_lu);
2585
2586 out:
2587         down_write(&com->lc_sem);
2588         com->lc_new_scanned++;
2589         com->lc_new_checked++;
2590         if (rc > 0) {
2591                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2592                 rc = 0;
2593         } else if (rc < 0) {
2594                 lo->ll_objs_failed_phase2++;
2595         }
2596         up_write(&com->lc_sem);
2597
2598         return rc;
2599 }
2600
2601 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2602                                     struct lfsck_component *com,
2603                                     struct lfsck_tgt_desc *ltd)
2604 {
2605         struct lfsck_layout             *lo     = com->lc_file_ram;
2606         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2607         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2608         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2609         struct ost_id                   *oi     = &info->lti_oi;
2610         struct lu_fid                   *fid    = &info->lti_fid;
2611         struct dt_object                *obj;
2612         const struct dt_it_ops          *iops;
2613         struct dt_it                    *di;
2614         int                              rc     = 0;
2615         ENTRY;
2616
2617         CDEBUG(D_LFSCK, "%s: start the orphan scanning for OST%04x\n",
2618                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2619
2620         ostid_set_seq(oi, FID_SEQ_IDIF);
2621         ostid_set_id(oi, 0);
2622         ostid_to_fid(fid, oi, ltd->ltd_index);
2623         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2624         if (unlikely(IS_ERR(obj)))
2625                 RETURN(PTR_ERR(obj));
2626
2627         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2628         if (rc != 0)
2629                 GOTO(put, rc);
2630
2631         iops = &obj->do_index_ops->dio_it;
2632         di = iops->init(env, obj, 0, BYPASS_CAPA);
2633         if (IS_ERR(di))
2634                 GOTO(put, rc = PTR_ERR(di));
2635
2636         rc = iops->load(env, di, 0);
2637         if (rc == -ESRCH) {
2638                 /* -ESRCH means that the orphan OST-objects rbtree has been
2639                  * cleanup because of the OSS server restart or other errors. */
2640                 lo->ll_flags |= LF_INCOMPLETE;
2641                 GOTO(fini, rc);
2642         }
2643
2644         if (rc == 0)
2645                 rc = iops->next(env, di);
2646         else if (rc > 0)
2647                 rc = 0;
2648
2649         if (rc < 0)
2650                 GOTO(fini, rc);
2651
2652         if (rc > 0)
2653                 GOTO(fini, rc = 0);
2654
2655         do {
2656                 struct dt_key           *key;
2657                 struct lu_orphan_rec    *rec = &info->lti_rec;
2658
2659                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2660                     cfs_fail_val > 0) {
2661                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2662                         struct l_wait_info       lwi;
2663
2664                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2665                                           NULL, NULL);
2666                         l_wait_event(thread->t_ctl_waitq,
2667                                      !thread_is_running(thread),
2668                                      &lwi);
2669                 }
2670
2671                 key = iops->key(env, di);
2672                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2673                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2674                 if (rc == 0)
2675                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2676                                         &com->lc_fid_latest_scanned_phase2);
2677                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2678                         GOTO(fini, rc);
2679
2680                 lfsck_control_speed_by_self(com);
2681                 do {
2682                         rc = iops->next(env, di);
2683                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2684         } while (rc == 0);
2685
2686         GOTO(fini, rc);
2687
2688 fini:
2689         iops->put(env, di);
2690         iops->fini(env, di);
2691 put:
2692         lu_object_put(env, &obj->do_lu);
2693
2694         CDEBUG(D_LFSCK, "%s: finish the orphan scanning for OST%04x, rc = %d\n",
2695                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2696
2697         return rc > 0 ? 0 : rc;
2698 }
2699
2700 /* For the MDT-object with dangling reference, we need to re-create
2701  * the missed OST-object with the known FID/owner information. */
2702 static int lfsck_layout_recreate_ostobj(const struct lu_env *env,
2703                                         struct lfsck_component *com,
2704                                         struct lfsck_layout_req *llr,
2705                                         struct lu_attr *la)
2706 {
2707         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2708         struct filter_fid               *pfid   = &info->lti_new_pfid;
2709         struct dt_allocation_hint       *hint   = &info->lti_hint;
2710         struct dt_object                *parent = llr->llr_parent->llo_obj;
2711         struct dt_object                *child  = llr->llr_child;
2712         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2713         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2714         struct thandle                  *handle;
2715         struct lu_buf                   *buf;
2716         struct lustre_handle             lh     = { 0 };
2717         int                              rc;
2718         ENTRY;
2719
2720         CDEBUG(D_LFSCK, "Repair dangling reference for: parent "DFID
2721                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2722                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2723                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid);
2724
2725         rc = lfsck_layout_lock(env, com, parent, &lh,
2726                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2727         if (rc != 0)
2728                 RETURN(rc);
2729
2730         handle = dt_trans_create(env, dev);
2731         if (IS_ERR(handle))
2732                 GOTO(unlock1, rc = PTR_ERR(handle));
2733
2734         hint->dah_parent = NULL;
2735         hint->dah_mode = 0;
2736         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2737         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2738         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2739          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2740          * parent MDT-object's layout EA. */
2741         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2742         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2743
2744         rc = dt_declare_create(env, child, la, hint, NULL, handle);
2745         if (rc != 0)
2746                 GOTO(stop, rc);
2747
2748         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2749                                   LU_XATTR_CREATE, handle);
2750         if (rc != 0)
2751                 GOTO(stop, rc);
2752
2753         rc = dt_trans_start(env, dev, handle);
2754         if (rc != 0)
2755                 GOTO(stop, rc);
2756
2757         dt_read_lock(env, parent, 0);
2758         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2759                 GOTO(unlock2, rc = 1);
2760
2761         rc = dt_create(env, child, la, hint, NULL, handle);
2762         if (rc != 0)
2763                 GOTO(unlock2, rc);
2764
2765         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2766                           handle, BYPASS_CAPA);
2767
2768         GOTO(unlock2, rc);
2769
2770 unlock2:
2771         dt_read_unlock(env, parent);
2772
2773 stop:
2774         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2775
2776 unlock1:
2777         lfsck_layout_unlock(&lh);
2778
2779         return rc;
2780 }
2781
2782 /* If the OST-object does not recognize the MDT-object as its parent, and
2783  * there is no other MDT-object claims as its parent, then just trust the
2784  * given MDT-object as its parent. So update the OST-object filter_fid. */
2785 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2786                                               struct lfsck_component *com,
2787                                               struct lfsck_layout_req *llr,
2788                                               const struct lu_attr *pla)
2789 {
2790         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2791         struct filter_fid               *pfid   = &info->lti_new_pfid;
2792         struct lu_attr                  *tla    = &info->lti_la3;
2793         struct dt_object                *parent = llr->llr_parent->llo_obj;
2794         struct dt_object                *child  = llr->llr_child;
2795         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2796         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2797         struct thandle                  *handle;
2798         struct lu_buf                   *buf;
2799         struct lustre_handle             lh     = { 0 };
2800         int                              rc;
2801         ENTRY;
2802
2803         CDEBUG(D_LFSCK, "Repair unmatched MDT-OST pair for: parent "DFID
2804                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2805                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2806                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
2807
2808         rc = lfsck_layout_lock(env, com, parent, &lh,
2809                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2810         if (rc != 0)
2811                 RETURN(rc);
2812
2813         handle = dt_trans_create(env, dev);
2814         if (IS_ERR(handle))
2815                 GOTO(unlock1, rc = PTR_ERR(handle));
2816
2817         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2818         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2819         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2820          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2821          * parent MDT-object's layout EA. */
2822         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2823         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2824
2825         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
2826         if (rc != 0)
2827                 GOTO(stop, rc);
2828
2829         tla->la_valid = LA_UID | LA_GID;
2830         tla->la_uid = pla->la_uid;
2831         tla->la_gid = pla->la_gid;
2832         rc = dt_declare_attr_set(env, child, tla, handle);
2833         if (rc != 0)
2834                 GOTO(stop, rc);
2835
2836         rc = dt_trans_start(env, dev, handle);
2837         if (rc != 0)
2838                 GOTO(stop, rc);
2839
2840         dt_write_lock(env, parent, 0);
2841         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2842                 GOTO(unlock2, rc = 1);
2843
2844         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
2845                           BYPASS_CAPA);
2846         if (rc != 0)
2847                 GOTO(unlock2, rc);
2848
2849         /* Get the latest parent's owner. */
2850         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2851         if (rc != 0)
2852                 GOTO(unlock2, rc);
2853
2854         tla->la_valid = LA_UID | LA_GID;
2855         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2856
2857         GOTO(unlock2, rc);
2858
2859 unlock2:
2860         dt_write_unlock(env, parent);
2861
2862 stop:
2863         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2864
2865 unlock1:
2866         lfsck_layout_unlock(&lh);
2867
2868         return rc;
2869 }
2870
2871 /* If there are more than one MDT-objects claim as the OST-object's parent,
2872  * and the OST-object only recognizes one of them, then we need to generate
2873  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
2874 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
2875                                                    struct lfsck_component *com,
2876                                                    struct lfsck_layout_req *llr,
2877                                                    struct lu_attr *la,
2878                                                    struct lu_buf *buf)
2879 {
2880         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2881         struct dt_allocation_hint       *hint   = &info->lti_hint;
2882         struct dt_object_format         *dof    = &info->lti_dof;
2883         struct dt_device                *pdev   = com->lc_lfsck->li_next;
2884         struct ost_id                   *oi     = &info->lti_oi;
2885         struct dt_object                *parent = llr->llr_parent->llo_obj;
2886         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
2887         struct dt_object                *child  = NULL;
2888         struct lu_device                *d      = &cdev->dd_lu_dev;
2889         struct lu_object                *o      = NULL;
2890         struct thandle                  *handle;
2891         struct lov_mds_md_v1            *lmm;
2892         struct lov_ost_data_v1          *objs;
2893         struct lustre_handle             lh     = { 0 };
2894         __u32                            magic;
2895         int                              rc;
2896         ENTRY;
2897
2898         CDEBUG(D_LFSCK, "Repair multiple references for: parent "DFID
2899                ", OST-index %u, stripe-index %u, owner %u:%u\n",
2900                PFID(lfsck_dto2fid(parent)), llr->llr_ost_idx,
2901                llr->llr_lov_idx, la->la_uid, la->la_gid);
2902
2903         rc = lfsck_layout_lock(env, com, parent, &lh,
2904                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2905         if (rc != 0)
2906                 RETURN(rc);
2907
2908         handle = dt_trans_create(env, pdev);
2909         if (IS_ERR(handle))
2910                 GOTO(unlock1, rc = PTR_ERR(handle));
2911
2912         o = lu_object_anon(env, d, NULL);
2913         if (IS_ERR(o))
2914                 GOTO(stop, rc = PTR_ERR(o));
2915
2916         child = container_of(o, struct dt_object, do_lu);
2917         o = lu_object_locate(o->lo_header, d->ld_type);
2918         if (unlikely(o == NULL))
2919                 GOTO(stop, rc = -EINVAL);
2920
2921         child = container_of(o, struct dt_object, do_lu);
2922         la->la_valid = LA_UID | LA_GID;
2923         hint->dah_parent = NULL;
2924         hint->dah_mode = 0;
2925         dof->dof_type = DFT_REGULAR;
2926         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
2927         if (rc != 0)
2928                 GOTO(stop, rc);
2929
2930         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2931                                   LU_XATTR_REPLACE, handle);
2932         if (rc != 0)
2933                 GOTO(stop, rc);
2934
2935         rc = dt_trans_start(env, pdev, handle);
2936         if (rc != 0)
2937                 GOTO(stop, rc);
2938
2939         dt_write_lock(env, parent, 0);
2940         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2941                 GOTO(unlock2, rc = 0);
2942
2943         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2944         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
2945                 GOTO(unlock2, rc = 0);
2946
2947         lmm = buf->lb_buf;
2948         rc = lfsck_layout_verify_header(lmm);
2949         if (rc != 0)
2950                 GOTO(unlock2, rc);
2951
2952         /* Someone change layout during the LFSCK, no need to repair then. */
2953         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
2954                 GOTO(unlock2, rc = 0);
2955
2956         rc = dt_create(env, child, la, hint, dof, handle);
2957         if (rc != 0)
2958                 GOTO(unlock2, rc);
2959
2960         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2961          * been verified in lfsck_layout_verify_header() already. If some
2962          * new magic introduced in the future, then layout LFSCK needs to
2963          * be updated also. */
2964         magic = le32_to_cpu(lmm->lmm_magic);
2965         if (magic == LOV_MAGIC_V1) {
2966                 objs = &(lmm->lmm_objects[0]);
2967         } else {
2968                 LASSERT(magic == LOV_MAGIC_V3);
2969                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2970         }
2971
2972         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
2973         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
2974         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
2975         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
2976         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
2977         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2978                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
2979
2980         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2981
2982 unlock2:
2983         dt_write_unlock(env, parent);
2984
2985 stop:
2986         if (child != NULL)
2987                 lu_object_put(env, &child->do_lu);
2988
2989         dt_trans_stop(env, pdev, handle);
2990
2991 unlock1:
2992         lfsck_layout_unlock(&lh);
2993
2994         return rc;
2995 }
2996
2997 /* If the MDT-object and the OST-object have different owner information,
2998  * then trust the MDT-object, because the normal chown/chgrp handle order
2999  * is from MDT to OST, and it is possible that some chown/chgrp operation
3000  * is partly done. */
3001 static int lfsck_layout_repair_owner(const struct lu_env *env,
3002                                      struct lfsck_component *com,
3003                                      struct lfsck_layout_req *llr,
3004                                      struct lu_attr *pla)
3005 {
3006         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3007         struct lu_attr                  *tla    = &info->lti_la3;
3008         struct dt_object                *parent = llr->llr_parent->llo_obj;
3009         struct dt_object                *child  = llr->llr_child;
3010         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3011         struct thandle                  *handle;
3012         int                              rc;
3013         ENTRY;
3014
3015         CDEBUG(D_LFSCK, "Repair inconsistent file owner for: parent "DFID
3016                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
3017                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
3018                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
3019
3020         handle = dt_trans_create(env, dev);
3021         if (IS_ERR(handle))
3022                 RETURN(PTR_ERR(handle));
3023
3024         tla->la_uid = pla->la_uid;
3025         tla->la_gid = pla->la_gid;
3026         tla->la_valid = LA_UID | LA_GID;
3027         rc = dt_declare_attr_set(env, child, tla, handle);
3028         if (rc != 0)
3029                 GOTO(stop, rc);
3030
3031         rc = dt_trans_start(env, dev, handle);
3032         if (rc != 0)
3033                 GOTO(stop, rc);
3034
3035         /* Use the dt_object lock to serialize with destroy and attr_set. */
3036         dt_read_lock(env, parent, 0);
3037         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3038                 GOTO(unlock, rc = 1);
3039
3040         /* Get the latest parent's owner. */
3041         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3042         if (rc != 0) {
3043                 CWARN("%s: fail to get the latest parent's ("DFID") owner, "
3044                       "not sure whether some others chown/chgrp during the "
3045                       "LFSCK: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
3046                       PFID(lfsck_dto2fid(parent)), rc);
3047
3048                 GOTO(unlock, rc);
3049         }
3050
3051         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3052         if (unlikely(tla->la_uid != pla->la_uid ||
3053                      tla->la_gid != pla->la_gid))
3054                 GOTO(unlock, rc = 1);
3055
3056         tla->la_valid = LA_UID | LA_GID;
3057         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3058
3059         GOTO(unlock, rc);
3060
3061 unlock:
3062         dt_read_unlock(env, parent);
3063
3064 stop:
3065         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3066
3067         return rc;
3068 }
3069
3070 /* Check whether the OST-object correctly back points to the
3071  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3072 static int lfsck_layout_check_parent(const struct lu_env *env,
3073                                      struct lfsck_component *com,
3074                                      struct dt_object *parent,
3075                                      const struct lu_fid *pfid,
3076                                      const struct lu_fid *cfid,
3077                                      const struct lu_attr *pla,
3078                                      const struct lu_attr *cla,
3079                                      struct lfsck_layout_req *llr,
3080                                      struct lu_buf *lov_ea, __u32 idx)
3081 {
3082         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3083         struct lu_buf                   *buf    = &info->lti_big_buf;
3084         struct dt_object                *tobj;
3085         struct lov_mds_md_v1            *lmm;
3086         struct lov_ost_data_v1          *objs;
3087         int                              rc;
3088         int                              i;
3089         __u32                            magic;
3090         __u16                            count;
3091         ENTRY;
3092
3093         if (fid_is_zero(pfid)) {
3094                 /* client never wrote. */
3095                 if (cla->la_size == 0 && cla->la_blocks == 0) {
3096                         if (unlikely(cla->la_uid != pla->la_uid ||
3097                                      cla->la_gid != pla->la_gid))
3098                                 RETURN (LLIT_INCONSISTENT_OWNER);
3099
3100                         RETURN(0);
3101                 }
3102
3103                 RETURN(LLIT_UNMATCHED_PAIR);
3104         }
3105
3106         if (unlikely(!fid_is_sane(pfid)))
3107                 RETURN(LLIT_UNMATCHED_PAIR);
3108
3109         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3110                 if (llr->llr_lov_idx == idx)
3111                         RETURN(0);
3112
3113                 RETURN(LLIT_UNMATCHED_PAIR);
3114         }
3115
3116         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3117         if (tobj == NULL)
3118                 RETURN(LLIT_UNMATCHED_PAIR);
3119
3120         if (IS_ERR(tobj))
3121                 RETURN(PTR_ERR(tobj));
3122
3123         if (!dt_object_exists(tobj))
3124                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3125
3126         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3127          * remote one on another MDT. Then check whether the given OST-object
3128          * is in such layout. If yes, it is multiple referenced, otherwise it
3129          * is unmatched referenced case. */
3130         rc = lfsck_layout_get_lovea(env, tobj, buf, NULL);
3131         if (rc == 0)
3132                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3133
3134         if (rc < 0)
3135                 GOTO(out, rc);
3136
3137         lmm = buf->lb_buf;
3138         rc = lfsck_layout_verify_header(lmm);
3139         if (rc != 0)
3140                 GOTO(out, rc);
3141
3142         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3143          * been verified in lfsck_layout_verify_header() already. If some
3144          * new magic introduced in the future, then layout LFSCK needs to
3145          * be updated also. */
3146         magic = le32_to_cpu(lmm->lmm_magic);
3147         if (magic == LOV_MAGIC_V1) {
3148                 objs = &(lmm->lmm_objects[0]);
3149         } else {
3150                 LASSERT(magic == LOV_MAGIC_V3);
3151                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3152         }
3153
3154         count = le16_to_cpu(lmm->lmm_stripe_count);
3155         for (i = 0; i < count; i++, objs++) {
3156                 struct lu_fid           *tfid   = &info->lti_fid2;
3157                 struct ost_id           *oi     = &info->lti_oi;
3158
3159                 if (is_dummy_lov_ost_data(objs))
3160                         continue;
3161
3162                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3163                 ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx));
3164                 if (lu_fid_eq(cfid, tfid)) {
3165                         *lov_ea = *buf;
3166
3167                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
3168                 }
3169         }
3170
3171         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3172
3173 out:
3174         lfsck_object_put(env, tobj);
3175
3176         return rc;
3177 }
3178
3179 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
3180                                              struct lfsck_component *com,
3181                                              struct lfsck_layout_req *llr)
3182 {
3183         struct lfsck_layout                  *lo     = com->lc_file_ram;
3184         struct lfsck_thread_info             *info   = lfsck_env_info(env);
3185         struct filter_fid_old                *pea    = &info->lti_old_pfid;
3186         struct lu_fid                        *pfid   = &info->lti_fid;
3187         struct lu_buf                        *buf    = NULL;
3188         struct dt_object                     *parent = llr->llr_parent->llo_obj;
3189         struct dt_object                     *child  = llr->llr_child;
3190         struct lu_attr                       *pla    = &info->lti_la;
3191         struct lu_attr                       *cla    = &info->lti_la2;
3192         struct lfsck_instance                *lfsck  = com->lc_lfsck;
3193         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
3194         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
3195         __u32                                 idx    = 0;
3196         int                                   rc;
3197         ENTRY;
3198
3199         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3200         if (rc != 0) {
3201                 if (lu_object_is_dying(parent->do_lu.lo_header))
3202                         RETURN(0);
3203
3204                 GOTO(out, rc);
3205         }
3206
3207         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3208         if (rc == -ENOENT) {
3209                 if (lu_object_is_dying(parent->do_lu.lo_header))
3210                         RETURN(0);
3211
3212                 type = LLIT_DANGLING;
3213                 goto repair;
3214         }
3215
3216         if (rc != 0)
3217                 GOTO(out, rc);
3218
3219         buf = lfsck_buf_get(env, pea, sizeof(struct filter_fid_old));
3220         rc= dt_xattr_get(env, child, buf, XATTR_NAME_FID, BYPASS_CAPA);
3221         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3222                      rc != sizeof(struct filter_fid))) {
3223                 type = LLIT_UNMATCHED_PAIR;
3224                 goto repair;
3225         }
3226
3227         if (rc < 0 && rc != -ENODATA)
3228                 GOTO(out, rc);
3229
3230         if (rc == -ENODATA) {
3231                 fid_zero(pfid);
3232         } else {
3233                 fid_le_to_cpu(pfid, &pea->ff_parent);
3234                 /* Currently, the filter_fid::ff_parent::f_ver is not the
3235                  * real parent MDT-object's FID::f_ver, instead it is the
3236                  * OST-object index in its parent MDT-object's layout EA. */
3237                 idx = pfid->f_stripe_idx;
3238                 pfid->f_ver = 0;
3239         }
3240
3241         rc = lfsck_layout_check_parent(env, com, parent, pfid,
3242                                        lu_object_fid(&child->do_lu),
3243                                        pla, cla, llr, buf, idx);
3244         if (rc > 0) {
3245                 type = rc;
3246                 goto repair;
3247         }
3248
3249         if (rc < 0)
3250                 GOTO(out, rc);
3251
3252         if (unlikely(cla->la_uid != pla->la_uid ||
3253                      cla->la_gid != pla->la_gid)) {
3254                 type = LLIT_INCONSISTENT_OWNER;
3255                 goto repair;
3256         }
3257
3258 repair:
3259         if (bk->lb_param & LPF_DRYRUN) {
3260                 if (type != LLIT_NONE)
3261                         GOTO(out, rc = 1);
3262                 else
3263                         GOTO(out, rc = 0);
3264         }
3265
3266         switch (type) {
3267         case LLIT_DANGLING:
3268                 memset(cla, 0, sizeof(*cla));
3269                 cla->la_uid = pla->la_uid;
3270                 cla->la_gid = pla->la_gid;
3271                 cla->la_mode = S_IFREG | 0666;
3272                 cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
3273                                 LA_ATIME | LA_MTIME | LA_CTIME;
3274                 rc = lfsck_layout_recreate_ostobj(env, com, llr, cla);
3275                 break;
3276         case LLIT_UNMATCHED_PAIR:
3277                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3278                 break;
3279         case LLIT_MULTIPLE_REFERENCED:
3280                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3281                                                              pla, buf);
3282                 break;
3283         case LLIT_INCONSISTENT_OWNER:
3284                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3285                 break;
3286         default:
3287                 rc = 0;
3288                 break;
3289         }
3290
3291         GOTO(out, rc);
3292
3293 out:
3294         down_write(&com->lc_sem);
3295         if (rc < 0) {
3296                 /* If cannot touch the target server,
3297                  * mark the LFSCK as INCOMPLETE. */
3298                 if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT ||
3299                     rc == -EHOSTDOWN || rc == -EHOSTUNREACH) {
3300                         CERROR("%s: Fail to talk with OST %x: rc = %d.\n",
3301                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3302                         lo->ll_flags |= LF_INCOMPLETE;
3303                         lo->ll_objs_skipped++;
3304                         rc = 0;
3305                 } else {
3306                         lo->ll_objs_failed_phase1++;
3307                 }
3308         } else if (rc > 0) {
3309                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3310                          "unknown type = %d\n", type);
3311
3312                 lo->ll_objs_repaired[type - 1]++;
3313         }
3314         up_write(&com->lc_sem);
3315
3316         return rc;
3317 }
3318
3319 static int lfsck_layout_assistant(void *args)
3320 {
3321         struct lfsck_thread_args        *lta     = args;
3322         struct lu_env                   *env     = &lta->lta_env;
3323         struct lfsck_component          *com     = lta->lta_com;
3324         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
3325         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3326         struct lfsck_position           *pos     = &com->lc_pos_start;
3327         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3328         struct lfsck_request            *lr      = &info->lti_lr;
3329         struct lfsck_layout_master_data *llmd    = com->lc_data;
3330         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3331         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3332         struct lfsck_layout_req         *llr;
3333         struct l_wait_info               lwi     = { 0 };
3334         int                              rc      = 0;
3335         int                              rc1     = 0;
3336         ENTRY;
3337
3338         memset(lr, 0, sizeof(*lr));
3339         lr->lr_event = LE_START;
3340         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3341                        LSV_ASYNC_WINDOWS;
3342         lr->lr_speed = bk->lb_speed_limit;
3343         lr->lr_version = bk->lb_version;
3344         lr->lr_param = bk->lb_param;
3345         lr->lr_async_windows = bk->lb_async_windows;
3346         lr->lr_flags = LEF_TO_OST;
3347         if (pos->lp_oit_cookie <= 1)
3348                 lr->lr_param |= LPF_RESET;
3349
3350         rc = lfsck_layout_master_notify_others(env, com, lr);
3351         if (rc != 0) {
3352                 CERROR("%s: fail to notify others for layout start: rc = %d\n",
3353                        lfsck_lfsck2name(lfsck), rc);
3354                 GOTO(fini, rc);
3355         }
3356
3357         spin_lock(&llmd->llmd_lock);
3358         thread_set_flags(athread, SVC_RUNNING);
3359         spin_unlock(&llmd->llmd_lock);
3360         wake_up_all(&mthread->t_ctl_waitq);
3361
3362         while (1) {
3363                 while (!list_empty(&llmd->llmd_req_list)) {
3364                         bool wakeup = false;
3365
3366                         if (unlikely(llmd->llmd_exit))
3367                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
3368
3369                         llr = list_entry(llmd->llmd_req_list.next,
3370                                          struct lfsck_layout_req,
3371                                          llr_list);
3372                         /* Only the lfsck_layout_assistant thread itself can
3373                          * remove the "llr" from the head of the list, LFSCK
3374                          * engine thread only inserts other new "lld" at the
3375                          * end of the list. So it is safe to handle current
3376                          * "llr" without the spin_lock. */
3377                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
3378                         spin_lock(&llmd->llmd_lock);
3379                         list_del_init(&llr->llr_list);
3380                         llmd->llmd_prefetched--;
3381                         /* Wake up the main engine thread only when the list
3382                          * is empty or half of the prefetched items have been
3383                          * handled to avoid too frequent thread schedule. */
3384                         if (llmd->llmd_prefetched == 0 ||
3385                             (bk->lb_async_windows != 0 &&
3386                              bk->lb_async_windows / 2 ==
3387                              llmd->llmd_prefetched))
3388                                 wakeup = true;
3389                         spin_unlock(&llmd->llmd_lock);
3390                         if (wakeup)
3391                                 wake_up_all(&mthread->t_ctl_waitq);
3392
3393                         lfsck_layout_req_fini(env, llr);
3394                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3395                                 GOTO(cleanup1, rc);
3396                 }
3397
3398                 l_wait_event(athread->t_ctl_waitq,
3399                              !lfsck_layout_req_empty(llmd) ||
3400                              llmd->llmd_exit ||
3401                              llmd->llmd_to_post ||
3402                              llmd->llmd_to_double_scan,
3403                              &lwi);
3404
3405                 if (unlikely(llmd->llmd_exit))
3406                         GOTO(cleanup1, rc = llmd->llmd_post_result);
3407
3408                 if (!list_empty(&llmd->llmd_req_list))
3409                         continue;
3410
3411                 if (llmd->llmd_to_post) {
3412                         llmd->llmd_to_post = 0;
3413                         LASSERT(llmd->llmd_post_result > 0);
3414
3415                         memset(lr, 0, sizeof(*lr));
3416                         lr->lr_event = LE_PHASE1_DONE;
3417                         lr->lr_status = llmd->llmd_post_result;
3418                         rc = lfsck_layout_master_notify_others(env, com, lr);
3419                         if (rc != 0)
3420                                 CERROR("%s: failed to notify others "
3421                                        "for layout post: rc = %d\n",
3422                                        lfsck_lfsck2name(lfsck), rc);
3423
3424                         /* Wakeup the master engine to go ahead. */
3425                         wake_up_all(&mthread->t_ctl_waitq);
3426                 }
3427
3428                 if (llmd->llmd_to_double_scan) {
3429                         llmd->llmd_to_double_scan = 0;
3430                         atomic_inc(&lfsck->li_double_scan_count);
3431                         llmd->llmd_in_double_scan = 1;
3432                         wake_up_all(&mthread->t_ctl_waitq);
3433
3434                         com->lc_new_checked = 0;
3435                         com->lc_new_scanned = 0;
3436                         com->lc_time_last_checkpoint = cfs_time_current();
3437                         com->lc_time_next_checkpoint =
3438                                 com->lc_time_last_checkpoint +
3439                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3440
3441                         while (llmd->llmd_in_double_scan) {
3442                                 struct lfsck_tgt_descs  *ltds =
3443                                                         &lfsck->li_ost_descs;
3444                                 struct lfsck_tgt_desc   *ltd;
3445
3446                                 rc = lfsck_layout_master_query_others(env, com);
3447                                 if (lfsck_layout_master_to_orphan(llmd))
3448                                         goto orphan;
3449
3450                                 if (rc < 0)
3451                                         GOTO(cleanup2, rc);
3452
3453                                 /* Pull LFSCK status on related targets once
3454                                  * per 30 seconds if we are not notified. */
3455                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
3456                                                            cfs_time_seconds(1),
3457                                                            NULL, NULL);
3458                                 rc = l_wait_event(athread->t_ctl_waitq,
3459                                         lfsck_layout_master_to_orphan(llmd) ||
3460                                         llmd->llmd_exit ||
3461                                         !thread_is_running(mthread),
3462                                         &lwi);
3463
3464                                 if (unlikely(llmd->llmd_exit ||
3465                                              !thread_is_running(mthread)))
3466                                         GOTO(cleanup2, rc = 0);
3467
3468                                 if (rc == -ETIMEDOUT)
3469                                         continue;
3470
3471                                 if (rc < 0)
3472                                         GOTO(cleanup2, rc);
3473
3474 orphan:
3475                                 spin_lock(&ltds->ltd_lock);
3476                                 while (!list_empty(
3477                                                 &llmd->llmd_ost_phase2_list)) {
3478                                         ltd = list_entry(
3479                                               llmd->llmd_ost_phase2_list.next,
3480                                               struct lfsck_tgt_desc,
3481                                               ltd_layout_phase_list);
3482                                         list_del_init(
3483                                                 &ltd->ltd_layout_phase_list);
3484                                         spin_unlock(&ltds->ltd_lock);
3485
3486                                         if (bk->lb_param & LPF_ALL_TGT) {
3487                                                 rc = lfsck_layout_scan_orphan(
3488                                                                 env, com, ltd);
3489                                                 if (rc != 0 &&
3490                                                     bk->lb_param & LPF_FAILOUT)
3491                                                         GOTO(cleanup2, rc);
3492                                         }
3493
3494                                         if (unlikely(llmd->llmd_exit ||
3495                                                 !thread_is_running(mthread)))
3496                                                 GOTO(cleanup2, rc = 0);
3497
3498                                         spin_lock(&ltds->ltd_lock);
3499                                 }
3500
3501                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
3502                                         spin_unlock(&ltds->ltd_lock);
3503                                         GOTO(cleanup2, rc = 1);
3504                                 }
3505                                 spin_unlock(&ltds->ltd_lock);
3506                         }
3507                 }
3508         }
3509
3510 cleanup1:
3511         /* Cleanup the unfinished requests. */
3512         spin_lock(&llmd->llmd_lock);
3513         if (rc < 0)
3514                 llmd->llmd_assistant_status = rc;
3515
3516         while (!list_empty(&llmd->llmd_req_list)) {
3517                 llr = list_entry(llmd->llmd_req_list.next,
3518                                  struct lfsck_layout_req,
3519                                  llr_list);
3520                 list_del_init(&llr->llr_list);
3521                 llmd->llmd_prefetched--;
3522                 spin_unlock(&llmd->llmd_lock);
3523                 lfsck_layout_req_fini(env, llr);
3524                 spin_lock(&llmd->llmd_lock);
3525         }
3526         spin_unlock(&llmd->llmd_lock);
3527
3528         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
3529                  llmd->llmd_prefetched);
3530
3531 cleanup2:
3532         memset(lr, 0, sizeof(*lr));
3533         if (rc > 0) {
3534                 lr->lr_event = LE_PHASE2_DONE;
3535                 lr->lr_status = rc;
3536         } else if (rc == 0) {
3537                 if (lfsck->li_flags & LPF_ALL_TGT) {
3538                         lr->lr_event = LE_STOP;
3539                         lr->lr_status = LS_STOPPED;
3540                 } else {
3541                         lr->lr_event = LE_PEER_EXIT;
3542                         switch (lfsck->li_status) {
3543                         case LS_PAUSED:
3544                         case LS_CO_PAUSED:
3545                                 lr->lr_status = LS_CO_PAUSED;
3546                                 break;
3547                         case LS_STOPPED:
3548                         case LS_CO_STOPPED:
3549                                 lr->lr_status = LS_CO_STOPPED;
3550                                 break;
3551                         default:
3552                                 CERROR("%s: unknown status: rc = %d\n",
3553                                        lfsck_lfsck2name(lfsck),
3554                                        lfsck->li_status);
3555                                 lr->lr_status = LS_CO_FAILED;
3556                                 break;
3557                         }
3558                 }
3559         } else {
3560                 if (lfsck->li_flags & LPF_ALL_TGT) {
3561                         lr->lr_event = LE_STOP;
3562                         lr->lr_status = LS_FAILED;
3563                 } else {
3564                         lr->lr_event = LE_PEER_EXIT;
3565                         lr->lr_status = LS_CO_FAILED;
3566                 }
3567         }
3568
3569         rc1 = lfsck_layout_master_notify_others(env, com, lr);
3570         if (rc1 != 0) {
3571                 CERROR("%s: failed to notify others for layout quit: rc = %d\n",
3572                        lfsck_lfsck2name(lfsck), rc1);
3573                 rc = rc1;
3574         }
3575
3576         /* Under force exit case, some requests may be just freed without
3577          * verification, those objects should be re-handled when next run.
3578          * So not update the on-disk tracing file under such case. */
3579         if (!llmd->llmd_exit)
3580                 rc1 = lfsck_layout_double_scan_result(env, com, rc);
3581
3582 fini:
3583         if (llmd->llmd_in_double_scan)
3584                 atomic_dec(&lfsck->li_double_scan_count);
3585
3586         spin_lock(&llmd->llmd_lock);
3587         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
3588         thread_set_flags(athread, SVC_STOPPED);
3589         wake_up_all(&mthread->t_ctl_waitq);
3590         spin_unlock(&llmd->llmd_lock);
3591         lfsck_thread_args_fini(lta);
3592
3593         return rc;
3594 }
3595
3596 static int
3597 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3598                                    struct ptlrpc_request *req,
3599                                    void *args, int rc)
3600 {
3601         struct lfsck_layout_slave_async_args *llsaa = args;
3602         struct obd_export                    *exp   = llsaa->llsaa_exp;
3603         struct lfsck_component               *com   = llsaa->llsaa_com;
3604         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3605         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3606         bool                                  done  = false;
3607
3608         if (rc != 0) {
3609                 /* It is quite probably caused by target crash,
3610                  * to make the LFSCK can go ahead, assume that
3611                  * the target finished the LFSCK prcoessing. */
3612                 done = true;
3613         } else {
3614                 struct lfsck_reply *lr;
3615
3616                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3617                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3618                     lr->lr_status != LS_SCANNING_PHASE2)
3619                         done = true;
3620         }
3621         if (done)
3622                 lfsck_layout_llst_del(llsd, llst);
3623         lfsck_layout_llst_put(llst);
3624         lfsck_component_put(env, com);
3625         class_export_put(exp);
3626
3627         return 0;
3628 }
3629
3630 static int lfsck_layout_async_query(const struct lu_env *env,
3631                                     struct lfsck_component *com,
3632                                     struct obd_export *exp,
3633                                     struct lfsck_layout_slave_target *llst,
3634                                     struct lfsck_request *lr,
3635                                     struct ptlrpc_request_set *set)
3636 {
3637         struct lfsck_layout_slave_async_args *llsaa;
3638         struct ptlrpc_request                *req;
3639         struct lfsck_request                 *tmp;
3640         int                                   rc;
3641         ENTRY;
3642
3643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3644         if (req == NULL)
3645                 RETURN(-ENOMEM);
3646
3647         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3648         if (rc != 0) {
3649                 ptlrpc_request_free(req);
3650                 RETURN(rc);
3651         }
3652
3653         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3654         *tmp = *lr;
3655         ptlrpc_request_set_replen(req);
3656
3657         llsaa = ptlrpc_req_async_args(req);
3658         llsaa->llsaa_exp = exp;
3659         llsaa->llsaa_com = lfsck_component_get(com);
3660         llsaa->llsaa_llst = llst;
3661         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3662         ptlrpc_set_add_req(set, req);
3663
3664         RETURN(0);
3665 }
3666
3667 static int lfsck_layout_async_notify(const struct lu_env *env,
3668                                      struct obd_export *exp,
3669                                      struct lfsck_request *lr,
3670                                      struct ptlrpc_request_set *set)
3671 {
3672         struct ptlrpc_request   *req;
3673         struct lfsck_request    *tmp;
3674         int                      rc;
3675         ENTRY;
3676
3677         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3678         if (req == NULL)
3679                 RETURN(-ENOMEM);
3680
3681         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3682         if (rc != 0) {
3683                 ptlrpc_request_free(req);
3684                 RETURN(rc);
3685         }
3686
3687         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3688         *tmp = *lr;
3689         ptlrpc_request_set_replen(req);
3690         ptlrpc_set_add_req(set, req);
3691
3692         RETURN(0);
3693 }
3694
3695 static int
3696 lfsck_layout_slave_query_master(const struct lu_env *env,
3697                                 struct lfsck_component *com)
3698 {
3699         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3700         struct lfsck_instance            *lfsck = com->lc_lfsck;
3701         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3702         struct lfsck_layout_slave_target *llst;
3703         struct obd_export                *exp;
3704         struct ptlrpc_request_set        *set;
3705         int                               rc    = 0;
3706         int                               rc1   = 0;
3707         ENTRY;
3708
3709         set = ptlrpc_prep_set();
3710         if (set == NULL)
3711                 RETURN(-ENOMEM);
3712
3713         memset(lr, 0, sizeof(*lr));
3714         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3715         lr->lr_event = LE_QUERY;
3716         lr->lr_active = LT_LAYOUT;
3717
3718         llsd->llsd_touch_gen++;
3719         spin_lock(&llsd->llsd_lock);
3720         while (!list_empty(&llsd->llsd_master_list)) {
3721                 llst = list_entry(llsd->llsd_master_list.next,
3722                                   struct lfsck_layout_slave_target,
3723                                   llst_list);
3724                 if (llst->llst_gen == llsd->llsd_touch_gen)
3725                         break;
3726
3727                 llst->llst_gen = llsd->llsd_touch_gen;
3728                 list_del(&llst->llst_list);
3729                 list_add_tail(&llst->llst_list,
3730                               &llsd->llsd_master_list);
3731                 atomic_inc(&llst->llst_ref);
3732                 spin_unlock(&llsd->llsd_lock);
3733
3734                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3735                                                llst->llst_index);
3736                 if (exp == NULL) {
3737                         lfsck_layout_llst_del(llsd, llst);
3738                         lfsck_layout_llst_put(llst);
3739                         spin_lock(&llsd->llsd_lock);
3740                         continue;
3741                 }
3742
3743                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3744                 if (rc != 0) {
3745                         CERROR("%s: slave fail to query %s for layout: "
3746                                "rc = %d\n", lfsck_lfsck2name(lfsck),
3747                                exp->exp_obd->obd_name, rc);
3748                         rc1 = rc;
3749                         lfsck_layout_llst_put(llst);
3750                         class_export_put(exp);
3751                 }
3752                 spin_lock(&llsd->llsd_lock);
3753         }
3754         spin_unlock(&llsd->llsd_lock);
3755
3756         rc = ptlrpc_set_wait(set);
3757         ptlrpc_set_destroy(set);
3758
3759         RETURN(rc1 != 0 ? rc1 : rc);
3760 }
3761
3762 static void
3763 lfsck_layout_slave_notify_master(const struct lu_env *env,
3764                                  struct lfsck_component *com,
3765                                  enum lfsck_events event, int result)
3766 {
3767         struct lfsck_instance            *lfsck = com->lc_lfsck;
3768         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3769         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3770         struct lfsck_layout_slave_target *llst;
3771         struct obd_export                *exp;
3772         struct ptlrpc_request_set        *set;
3773         int                               rc;
3774         ENTRY;
3775
3776         set = ptlrpc_prep_set();
3777         if (set == NULL)
3778                 RETURN_EXIT;
3779
3780         memset(lr, 0, sizeof(*lr));
3781         lr->lr_event = event;
3782         lr->lr_flags = LEF_FROM_OST;
3783         lr->lr_status = result;
3784         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3785         lr->lr_active = LT_LAYOUT;
3786         llsd->llsd_touch_gen++;
3787         spin_lock(&llsd->llsd_lock);
3788         while (!list_empty(&llsd->llsd_master_list)) {
3789                 llst = list_entry(llsd->llsd_master_list.next,
3790                                   struct lfsck_layout_slave_target,
3791                                   llst_list);
3792                 if (llst->llst_gen == llsd->llsd_touch_gen)
3793                         break;
3794
3795                 llst->llst_gen = llsd->llsd_touch_gen;
3796                 list_del(&llst->llst_list);
3797                 list_add_tail(&llst->llst_list,
3798                               &llsd->llsd_master_list);
3799                 atomic_inc(&llst->llst_ref);
3800                 spin_unlock(&llsd->llsd_lock);
3801
3802                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3803                                                llst->llst_index);
3804                 if (exp == NULL) {
3805                         lfsck_layout_llst_del(llsd, llst);
3806                         lfsck_layout_llst_put(llst);
3807                         spin_lock(&llsd->llsd_lock);
3808                         continue;
3809                 }
3810
3811                 rc = lfsck_layout_async_notify(env, exp, lr, set);
3812                 if (rc != 0)
3813                         CERROR("%s: slave fail to notify %s for layout: "
3814                                "rc = %d\n", lfsck_lfsck2name(lfsck),
3815                                exp->exp_obd->obd_name, rc);
3816                 lfsck_layout_llst_put(llst);
3817                 class_export_put(exp);
3818                 spin_lock(&llsd->llsd_lock);
3819         }
3820         spin_unlock(&llsd->llsd_lock);
3821
3822         ptlrpc_set_wait(set);
3823         ptlrpc_set_destroy(set);
3824
3825         RETURN_EXIT;
3826 }
3827
3828 /*
3829  * \ret -ENODATA: unrecognized stripe
3830  * \ret = 0     : recognized stripe
3831  * \ret < 0     : other failures
3832  */
3833 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
3834                                            struct lfsck_component *com,
3835                                            struct lu_fid *cfid,
3836                                            struct lu_fid *pfid)
3837 {
3838         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3839         struct lu_buf                   *buf    = &info->lti_big_buf;
3840         struct ost_id                   *oi     = &info->lti_oi;
3841         struct dt_object                *obj;
3842         struct lov_mds_md_v1            *lmm;
3843         struct lov_ost_data_v1          *objs;
3844         __u32                            idx    = pfid->f_stripe_idx;
3845         __u32                            magic;
3846         int                              rc     = 0;
3847         int                              i;
3848         __u16                            count;
3849         ENTRY;
3850
3851         pfid->f_ver = 0;
3852         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
3853         if (IS_ERR(obj))
3854                 RETURN(PTR_ERR(obj));
3855
3856         dt_read_lock(env, obj, 0);
3857         if (unlikely(!dt_object_exists(obj)))
3858                 GOTO(unlock, rc = -ENOENT);
3859
3860         rc = lfsck_layout_get_lovea(env, obj, buf, NULL);
3861         if (rc < 0)
3862                 GOTO(unlock, rc);
3863
3864         if (rc == 0)
3865                 GOTO(unlock, rc = -ENODATA);
3866
3867         lmm = buf->lb_buf;
3868         rc = lfsck_layout_verify_header(lmm);
3869         if (rc != 0)
3870                 GOTO(unlock, rc);
3871
3872         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3873          * been verified in lfsck_layout_verify_header() already. If some
3874          * new magic introduced in the future, then layout LFSCK needs to
3875          * be updated also. */
3876         magic = le32_to_cpu(lmm->lmm_magic);
3877         if (magic == LOV_MAGIC_V1) {
3878                 objs = &(lmm->lmm_objects[0]);
3879         } else {
3880                 LASSERT(magic == LOV_MAGIC_V3);
3881                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3882         }
3883
3884         fid_to_ostid(cfid, oi);
3885         count = le16_to_cpu(lmm->lmm_stripe_count);
3886         for (i = 0; i < count; i++, objs++) {
3887                 struct ost_id oi2;
3888
3889                 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
3890                 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
3891                         GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
3892         }
3893
3894         GOTO(unlock, rc = -ENODATA);
3895
3896 unlock:
3897         dt_read_unlock(env, obj);
3898         lu_object_put(env, &obj->do_lu);
3899
3900         return rc;
3901 }
3902
3903 /*
3904  * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
3905  * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
3906  * layout EA from MDT to OST. On one hand, the OST no need to understand
3907  * the layout EA structure; on the other hand, it may cause trouble when
3908  * transfer large layout EA from MDT to OST via normal OUT RPC.
3909  *
3910  * \ret > 0: unrecognized stripe
3911  * \ret = 0: recognized stripe
3912  * \ret < 0: other failures
3913  */
3914 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
3915                                           struct lfsck_component *com,
3916                                           struct lu_fid *cfid,
3917                                           struct lu_fid *pfid)
3918 {
3919         struct lfsck_instance    *lfsck  = com->lc_lfsck;
3920         struct obd_device        *obd    = lfsck->li_obd;
3921         struct seq_server_site   *ss     =
3922                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
3923         struct obd_export        *exp    = NULL;
3924         struct ptlrpc_request    *req    = NULL;
3925         struct lfsck_request     *lr;
3926         struct lu_seq_range       range  = { 0 };
3927         int                       rc     = 0;
3928         ENTRY;
3929
3930         if (unlikely(fid_is_idif(pfid)))
3931                 RETURN(1);
3932
3933         fld_range_set_any(&range);
3934         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
3935         if (rc != 0)
3936                 RETURN(rc == -ENOENT ? 1 : rc);
3937
3938         if (unlikely(!fld_range_is_mdt(&range)))
3939                 RETURN(1);
3940
3941         exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
3942         if (unlikely(exp == NULL))
3943                 RETURN(1);
3944
3945         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
3946                 GOTO(out, rc = -EOPNOTSUPP);
3947
3948         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3949         if (req == NULL)
3950                 GOTO(out, rc = -ENOMEM);
3951
3952         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3953         if (rc != 0) {
3954                 ptlrpc_request_free(req);
3955
3956                 GOTO(out, rc);
3957         }
3958
3959         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3960         memset(lr, 0, sizeof(*lr));
3961         lr->lr_event = LE_PAIRS_VERIFY;
3962         lr->lr_active = LT_LAYOUT;
3963         lr->lr_fid = *cfid; /* OST-object itself FID. */
3964         lr->lr_fid2 = *pfid; /* The claimed parent FID. */
3965
3966         ptlrpc_request_set_replen(req);
3967         rc = ptlrpc_queue_wait(req);
3968         ptlrpc_req_finished(req);
3969
3970         if (rc == -ENOENT || rc == -ENODATA)
3971                 rc = 1;
3972
3973         GOTO(out, rc);
3974
3975 out:
3976         if (exp != NULL)
3977                 class_export_put(exp);
3978
3979         return rc;
3980 }
3981
3982 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
3983                                           struct lfsck_component *com,
3984                                           struct lfsck_request *lr)
3985 {
3986         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3987         struct filter_fid               *ff     = &info->lti_new_pfid;
3988         struct lu_buf                   *buf;
3989         struct dt_device                *dev    = com->lc_lfsck->li_bottom;
3990         struct dt_object                *obj;
3991         struct thandle                  *th     = NULL;
3992         int                              rc     = 0;
3993         ENTRY;
3994
3995         obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
3996         if (IS_ERR(obj))
3997                 RETURN(PTR_ERR(obj));
3998
3999         fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
4000         buf = lfsck_buf_get(env, ff, sizeof(*ff));
4001         dt_write_lock(env, obj, 0);
4002         if (unlikely(!dt_object_exists(obj)))
4003                 GOTO(unlock, rc = 0);
4004
4005         th = dt_trans_create(env, dev);
4006         if (IS_ERR(th))
4007                 GOTO(unlock, rc = PTR_ERR(th));
4008
4009         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
4010         if (rc != 0)
4011                 GOTO(stop, rc);
4012
4013         rc = dt_trans_start_local(env, dev, th);
4014         if (rc != 0)
4015                 GOTO(stop, rc);
4016
4017         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
4018
4019         GOTO(stop, rc);
4020
4021 stop:
4022         dt_trans_stop(env, dev, th);
4023
4024 unlock:
4025         dt_write_unlock(env, obj);
4026         lu_object_put(env, &obj->do_lu);
4027
4028         return rc;
4029 }
4030
4031 /* layout APIs */
4032
4033 static int lfsck_layout_reset(const struct lu_env *env,
4034                               struct lfsck_component *com, bool init)
4035 {
4036         struct lfsck_layout     *lo    = com->lc_file_ram;
4037         int                      rc;
4038
4039         down_write(&com->lc_sem);
4040         if (init) {
4041                 memset(lo, 0, com->lc_file_size);
4042         } else {
4043                 __u32 count = lo->ll_success_count;
4044                 __u64 last_time = lo->ll_time_last_complete;
4045
4046                 memset(lo, 0, com->lc_file_size);
4047                 lo->ll_success_count = count;
4048                 lo->ll_time_last_complete = last_time;
4049         }
4050
4051         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
4052         lo->ll_status = LS_INIT;
4053
4054         rc = lfsck_layout_store(env, com);
4055         up_write(&com->lc_sem);
4056
4057         return rc;
4058 }
4059
4060 static void lfsck_layout_fail(const struct lu_env *env,
4061                               struct lfsck_component *com, bool new_checked)
4062 {
4063         struct lfsck_layout *lo = com->lc_file_ram;
4064
4065         down_write(&com->lc_sem);
4066         if (new_checked)
4067                 com->lc_new_checked++;
4068         lo->ll_objs_failed_phase1++;
4069         if (lo->ll_pos_first_inconsistent == 0) {
4070                 struct lfsck_instance *lfsck = com->lc_lfsck;
4071
4072                 lo->ll_pos_first_inconsistent =
4073                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
4074                                                         lfsck->li_di_oit);
4075         }
4076         up_write(&com->lc_sem);
4077 }
4078
4079 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
4080                                           struct lfsck_component *com, bool init)
4081 {
4082         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4083         struct lfsck_layout             *lo      = com->lc_file_ram;
4084         struct lfsck_layout_master_data *llmd    = com->lc_data;
4085         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4086         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4087         struct l_wait_info               lwi     = { 0 };
4088         int                              rc;
4089
4090         if (com->lc_new_checked == 0 && !init)
4091                 return 0;
4092
4093         l_wait_event(mthread->t_ctl_waitq,
4094                      list_empty(&llmd->llmd_req_list) ||
4095                      !thread_is_running(mthread) ||
4096                      thread_is_stopped(athread),
4097                      &lwi);
4098
4099         if (!thread_is_running(mthread) || thread_is_stopped(athread))
4100                 return 0;
4101
4102         down_write(&com->lc_sem);
4103         if (init) {
4104                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4105         } else {
4106                 lo->ll_pos_last_checkpoint =
4107                                         lfsck->li_pos_current.lp_oit_cookie;
4108                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4109                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4110                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4111                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4112                 com->lc_new_checked = 0;
4113         }
4114
4115         rc = lfsck_layout_store(env, com);
4116         up_write(&com->lc_sem);
4117
4118         return rc;
4119 }
4120
4121 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
4122                                          struct lfsck_component *com, bool init)
4123 {
4124         struct lfsck_instance   *lfsck = com->lc_lfsck;
4125         struct lfsck_layout     *lo    = com->lc_file_ram;
4126         int                      rc;
4127
4128         if (com->lc_new_checked == 0 && !init)
4129                 return 0;
4130
4131         down_write(&com->lc_sem);
4132
4133         if (init) {
4134                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4135         } else {
4136                 lo->ll_pos_last_checkpoint =
4137                                         lfsck->li_pos_current.lp_oit_cookie;
4138                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4139                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4140                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4141                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4142                 com->lc_new_checked = 0;
4143         }
4144
4145         rc = lfsck_layout_store(env, com);
4146
4147         up_write(&com->lc_sem);
4148
4149         return rc;
4150 }
4151
4152 static int lfsck_layout_prep(const struct lu_env *env,
4153                              struct lfsck_component *com,
4154                              struct lfsck_start *start)
4155 {
4156         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4157         struct lfsck_layout     *lo     = com->lc_file_ram;
4158         struct lfsck_position   *pos    = &com->lc_pos_start;
4159
4160         fid_zero(&pos->lp_dir_parent);
4161         pos->lp_dir_cookie = 0;
4162         if (lo->ll_status == LS_COMPLETED ||
4163             lo->ll_status == LS_PARTIAL ||
4164             /* To handle orphan, must scan from the beginning. */
4165             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
4166                 int rc;
4167
4168                 rc = lfsck_layout_reset(env, com, false);
4169                 if (rc != 0)
4170                         return rc;
4171         }
4172
4173         down_write(&com->lc_sem);
4174         lo->ll_time_latest_start = cfs_time_current_sec();
4175         spin_lock(&lfsck->li_lock);
4176         if (lo->ll_flags & LF_SCANNED_ONCE) {
4177                 if (!lfsck->li_drop_dryrun ||
4178                     lo->ll_pos_first_inconsistent == 0) {
4179                         lo->ll_status = LS_SCANNING_PHASE2;
4180                         list_del_init(&com->lc_link);
4181                         list_add_tail(&com->lc_link,
4182                                       &lfsck->li_list_double_scan);
4183                         pos->lp_oit_cookie = 0;
4184                 } else {
4185                         int i;
4186
4187                         lo->ll_status = LS_SCANNING_PHASE1;
4188                         lo->ll_run_time_phase1 = 0;
4189                         lo->ll_run_time_phase2 = 0;
4190                         lo->ll_objs_checked_phase1 = 0;
4191                         lo->ll_objs_checked_phase2 = 0;
4192                         lo->ll_objs_failed_phase1 = 0;
4193                         lo->ll_objs_failed_phase2 = 0;
4194                         for (i = 0; i < LLIT_MAX; i++)
4195                                 lo->ll_objs_repaired[i] = 0;
4196
4197                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4198                         fid_zero(&com->lc_fid_latest_scanned_phase2);
4199                 }
4200         } else {
4201                 lo->ll_status = LS_SCANNING_PHASE1;
4202                 if (!lfsck->li_drop_dryrun ||
4203                     lo->ll_pos_first_inconsistent == 0)
4204                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4205                 else
4206                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4207         }
4208         spin_unlock(&lfsck->li_lock);
4209         up_write(&com->lc_sem);
4210
4211         return 0;
4212 }
4213
4214 static int lfsck_layout_slave_prep(const struct lu_env *env,
4215                                    struct lfsck_component *com,
4216                                    struct lfsck_start_param *lsp)
4217 {
4218         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4219         struct lfsck_start              *start  = lsp->lsp_start;
4220         int                              rc;
4221
4222         rc = lfsck_layout_prep(env, com, start);
4223         if (rc != 0 || !lsp->lsp_index_valid)
4224                 return rc;
4225
4226         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4227         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
4228                 LASSERT(!llsd->llsd_rbtree_valid);
4229
4230                 write_lock(&llsd->llsd_rb_lock);
4231                 rc = lfsck_rbtree_setup(env, com);
4232                 write_unlock(&llsd->llsd_rb_lock);
4233         }
4234
4235         return rc;
4236 }
4237
4238 static int lfsck_layout_master_prep(const struct lu_env *env,
4239                                     struct lfsck_component *com,
4240                                     struct lfsck_start_param *lsp)
4241 {
4242         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4243         struct lfsck_layout_master_data *llmd    = com->lc_data;
4244         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4245         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4246         struct lfsck_thread_args        *lta;
4247         long                             rc;
4248         ENTRY;
4249
4250         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4251         if (rc != 0)
4252                 RETURN(rc);
4253
4254         llmd->llmd_assistant_status = 0;
4255         llmd->llmd_post_result = 0;
4256         llmd->llmd_to_post = 0;
4257         llmd->llmd_to_double_scan = 0;
4258         llmd->llmd_in_double_scan = 0;
4259         llmd->llmd_exit = 0;
4260         thread_set_flags(athread, 0);
4261
4262         lta = lfsck_thread_args_init(lfsck, com, lsp);
4263         if (IS_ERR(lta))
4264                 RETURN(PTR_ERR(lta));
4265
4266         rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"));
4267         if (IS_ERR_VALUE(rc)) {
4268                 CERROR("%s: Cannot start LFSCK layout assistant thread: "
4269                        "rc = %ld\n", lfsck_lfsck2name(lfsck), rc);
4270                 lfsck_thread_args_fini(lta);
4271         } else {
4272                 struct l_wait_info lwi = { 0 };
4273
4274                 l_wait_event(mthread->t_ctl_waitq,
4275                              thread_is_running(athread) ||
4276                              thread_is_stopped(athread),
4277                              &lwi);
4278                 if (unlikely(!thread_is_running(athread)))
4279                         rc = llmd->llmd_assistant_status;
4280                 else
4281                         rc = 0;
4282         }
4283
4284         RETURN(rc);
4285 }
4286
4287 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4288 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4289                                      struct lfsck_component *com,
4290                                      struct dt_object *parent,
4291                                      struct lov_mds_md_v1 *lmm)
4292 {
4293         struct lfsck_thread_info        *info    = lfsck_env_info(env);
4294         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4295         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
4296         struct lfsck_layout             *lo      = com->lc_file_ram;
4297         struct lfsck_layout_master_data *llmd    = com->lc_data;
4298         struct lfsck_layout_object      *llo     = NULL;
4299         struct lov_ost_data_v1          *objs;
4300         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
4301         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4302         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4303                 struct l_wait_info       lwi     = { 0 };
4304         struct lu_buf                   *buf;
4305         int                              rc      = 0;
4306         int                              i;
4307         __u32                            magic;
4308         __u16                            count;
4309         __u16                            gen;
4310         ENTRY;
4311
4312         buf = lfsck_buf_get(env, &info->lti_old_pfid,
4313                             sizeof(struct filter_fid_old));
4314         count = le16_to_cpu(lmm->lmm_stripe_count);
4315         gen = le16_to_cpu(lmm->lmm_layout_gen);
4316         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4317          * been verified in lfsck_layout_verify_header() already. If some
4318          * new magic introduced in the future, then layout LFSCK needs to
4319          * be updated also. */
4320         magic = le32_to_cpu(lmm->lmm_magic);
4321         if (magic == LOV_MAGIC_V1) {
4322                 objs = &(lmm->lmm_objects[0]);
4323         } else {
4324                 LASSERT(magic == LOV_MAGIC_V3);
4325                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4326         }
4327
4328         for (i = 0; i < count; i++, objs++) {
4329                 struct lu_fid           *fid    = &info->lti_fid;
4330                 struct ost_id           *oi     = &info->lti_oi;
4331                 struct lfsck_layout_req *llr;
4332                 struct lfsck_tgt_desc   *tgt    = NULL;
4333                 struct dt_object        *cobj   = NULL;
4334                 __u32                    index  =
4335                                         le32_to_cpu(objs->l_ost_idx);
4336                 bool                     wakeup = false;
4337
4338                 if (is_dummy_lov_ost_data(objs))
4339                         continue;
4340
4341                 l_wait_event(mthread->t_ctl_waitq,
4342                              bk->lb_async_windows == 0 ||
4343                              llmd->llmd_prefetched < bk->lb_async_windows ||
4344                              !thread_is_running(mthread) ||
4345                              thread_is_stopped(athread),
4346                              &lwi);
4347
4348                 if (unlikely(!thread_is_running(mthread)) ||
4349                              thread_is_stopped(athread))
4350                         GOTO(out, rc = 0);
4351
4352                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4353                 ostid_to_fid(fid, oi, index);
4354                 tgt = lfsck_tgt_get(ltds, index);
4355                 if (unlikely(tgt == NULL)) {
4356                         CERROR("%s: Cannot talk with OST %x which did not join "
4357                                "the layout LFSCK.\n",
4358                                lfsck_lfsck2name(lfsck), index);
4359                         lo->ll_flags |= LF_INCOMPLETE;
4360                         goto next;
4361                 }
4362
4363                 cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
4364                 if (IS_ERR(cobj)) {
4365                         rc = PTR_ERR(cobj);
4366                         goto next;
4367                 }
4368
4369                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4370                 if (rc != 0)
4371                         goto next;
4372
4373                 rc = dt_declare_xattr_get(env, cobj, buf, XATTR_NAME_FID,
4374                                           BYPASS_CAPA);
4375                 if (rc != 0)
4376                         goto next;
4377
4378                 if (llo == NULL) {
4379                         llo = lfsck_layout_object_init(env, parent, gen);
4380                         if (IS_ERR(llo)) {
4381                                 rc = PTR_ERR(llo);
4382                                 goto next;
4383                         }
4384                 }
4385
4386                 llr = lfsck_layout_req_init(llo, cobj, index, i);
4387                 if (IS_ERR(llr)) {
4388                         rc = PTR_ERR(llr);
4389                         goto next;
4390                 }
4391
4392                 cobj = NULL;
4393                 spin_lock(&llmd->llmd_lock);
4394                 if (llmd->llmd_assistant_status < 0) {
4395                         spin_unlock(&llmd->llmd_lock);
4396                         lfsck_layout_req_fini(env, llr);
4397                         lfsck_tgt_put(tgt);
4398                         RETURN(llmd->llmd_assistant_status);
4399                 }
4400
4401                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
4402                 if (llmd->llmd_prefetched == 0)
4403                         wakeup = true;
4404
4405                 llmd->llmd_prefetched++;
4406                 spin_unlock(&llmd->llmd_lock);
4407                 if (wakeup)
4408                         wake_up_all(&athread->t_ctl_waitq);
4409
4410 next:
4411                 down_write(&com->lc_sem);
4412                 com->lc_new_checked++;
4413                 if (rc < 0)
4414                         lo->ll_objs_failed_phase1++;
4415                 up_write(&com->lc_sem);
4416
4417                 if (cobj != NULL && !IS_ERR(cobj))
4418                         lu_object_put(env, &cobj->do_lu);
4419
4420                 if (likely(tgt != NULL))
4421                         lfsck_tgt_put(tgt);
4422
4423                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4424                         GOTO(out, rc);
4425         }
4426
4427         GOTO(out, rc = 0);
4428
4429 out:
4430         if (llo != NULL && !IS_ERR(llo))
4431                 lfsck_layout_object_put(env, llo);
4432
4433         return rc;
4434 }
4435
4436 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4437  * the OST-object's attribute and generate an structure lfsck_layout_req on the
4438  * list ::llmd_req_list.
4439  *
4440  * For each request on above list, the lfsck_layout_assistant thread compares
4441  * the OST side attribute with local attribute, if inconsistent, then repair it.
4442  *
4443  * All above processing is async mode with pipeline. */
4444 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4445                                         struct lfsck_component *com,
4446                                         struct dt_object *obj)
4447 {
4448         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4449         struct ost_id                   *oi     = &info->lti_oi;
4450         struct lfsck_layout             *lo     = com->lc_file_ram;
4451         struct lfsck_layout_master_data *llmd   = com->lc_data;
4452         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4453         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4454         struct thandle                  *handle = NULL;
4455         struct lu_buf                   *buf    = &info->lti_big_buf;
4456         struct lov_mds_md_v1            *lmm    = NULL;
4457         struct dt_device                *dev    = lfsck->li_bottom;
4458         struct lustre_handle             lh     = { 0 };
4459         ssize_t                          buflen = buf->lb_len;
4460         int                              rc     = 0;
4461         bool                             locked = false;
4462         bool                             stripe = false;
4463         ENTRY;
4464
4465         if (!S_ISREG(lfsck_object_type(obj)))
4466                 GOTO(out, rc = 0);
4467
4468         if (llmd->llmd_assistant_status < 0)
4469                 GOTO(out, rc = -ESRCH);
4470
4471         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4472         lmm_oi_cpu_to_le(oi, oi);
4473         dt_read_lock(env, obj, 0);
4474         locked = true;
4475
4476 again:
4477         rc = lfsck_layout_get_lovea(env, obj, buf, &buflen);
4478         if (rc <= 0)
4479                 GOTO(out, rc);
4480
4481         buf->lb_len = rc;
4482         lmm = buf->lb_buf;
4483         rc = lfsck_layout_verify_header(lmm);
4484         if (rc != 0)
4485                 GOTO(out, rc);
4486
4487         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4488                 GOTO(out, stripe = true);
4489
4490         /* Inconsistent lmm_oi, should be repaired. */
4491         CDEBUG(D_LFSCK, "Repair bad lmm_oi for "DFID"\n",
4492                PFID(lfsck_dto2fid(obj)));
4493
4494         if (bk->lb_param & LPF_DRYRUN) {
4495                 down_write(&com->lc_sem);
4496                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4497                 up_write(&com->lc_sem);
4498
4499                 GOTO(out, stripe = true);
4500         }
4501
4502         if (!lustre_handle_is_used(&lh)) {
4503                 dt_read_unlock(env, obj);
4504                 locked = false;
4505                 buf->lb_len = buflen;
4506                 rc = lfsck_layout_lock(env, com, obj, &lh,
4507                                        MDS_INODELOCK_LAYOUT |
4508                                        MDS_INODELOCK_XATTR);
4509                 if (rc != 0)
4510                         GOTO(out, rc);
4511
4512                 handle = dt_trans_create(env, dev);
4513                 if (IS_ERR(handle))
4514                         GOTO(out, rc = PTR_ERR(handle));
4515
4516                 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
4517                                           LU_XATTR_REPLACE, handle);
4518                 if (rc != 0)
4519                         GOTO(out, rc);
4520
4521                 rc = dt_trans_start_local(env, dev, handle);
4522                 if (rc != 0)
4523                         GOTO(out, rc);
4524
4525                 dt_write_lock(env, obj, 0);
4526                 locked = true;
4527
4528                 goto again;
4529         }
4530
4531         lmm->lmm_oi = *oi;
4532         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LOV,
4533                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4534         if (rc != 0)
4535                 GOTO(out, rc);
4536
4537         down_write(&com->lc_sem);
4538         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4539         up_write(&com->lc_sem);
4540
4541         GOTO(out, stripe = true);
4542
4543 out:
4544         if (locked) {
4545                 if (lustre_handle_is_used(&lh))
4546                         dt_write_unlock(env, obj);
4547                 else
4548                         dt_read_unlock(env, obj);
4549         }
4550
4551         if (handle != NULL && !IS_ERR(handle))
4552                 dt_trans_stop(env, dev, handle);
4553
4554         lfsck_layout_unlock(&lh);
4555         if (stripe) {
4556                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4557         } else {
4558                 down_write(&com->lc_sem);
4559                 com->lc_new_checked++;
4560                 if (rc < 0)
4561                         lo->ll_objs_failed_phase1++;
4562                 up_write(&com->lc_sem);
4563         }
4564         buf->lb_len = buflen;
4565
4566         return rc;
4567 }
4568
4569 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4570                                        struct lfsck_component *com,
4571                                        struct dt_object *obj)
4572 {
4573         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4574         struct lfsck_layout             *lo     = com->lc_file_ram;
4575         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
4576         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4577         struct lfsck_layout_seq         *lls;
4578         __u64                            seq;
4579         __u64                            oid;
4580         int                              rc;
4581         ENTRY;
4582
4583         LASSERT(llsd != NULL);
4584
4585         lfsck_rbtree_update_bitmap(env, com, fid, false);
4586
4587         down_write(&com->lc_sem);
4588         if (fid_is_idif(fid))
4589                 seq = 0;
4590         else if (!fid_is_norm(fid) ||
4591                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4592                 GOTO(unlock, rc = 0);
4593         else
4594                 seq = fid_seq(fid);
4595         com->lc_new_checked++;
4596
4597         lls = lfsck_layout_seq_lookup(llsd, seq);
4598         if (lls == NULL) {
4599                 OBD_ALLOC_PTR(lls);
4600                 if (unlikely(lls == NULL))
4601                         GOTO(unlock, rc = -ENOMEM);
4602
4603                 INIT_LIST_HEAD(&lls->lls_list);
4604                 lls->lls_seq = seq;
4605                 rc = lfsck_layout_lastid_load(env, com, lls);
4606                 if (rc != 0) {
4607                         lo->ll_objs_failed_phase1++;
4608                         OBD_FREE_PTR(lls);
4609                         GOTO(unlock, rc);
4610                 }
4611
4612                 lfsck_layout_seq_insert(llsd, lls);
4613         }
4614
4615         if (unlikely(fid_is_last_id(fid)))
4616                 GOTO(unlock, rc = 0);
4617
4618         oid = fid_oid(fid);
4619         if (oid > lls->lls_lastid_known)
4620                 lls->lls_lastid_known = oid;
4621
4622         if (oid > lls->lls_lastid) {
4623                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4624                         /* OFD may create new objects during LFSCK scanning. */
4625                         rc = lfsck_layout_lastid_reload(env, com, lls);
4626                         if (unlikely(rc != 0))
4627                                 CWARN("%s: failed to reload LAST_ID for "LPX64
4628                                       ": rc = %d\n",
4629                                       lfsck_lfsck2name(com->lc_lfsck),
4630                                       lls->lls_seq, rc);
4631                         if (oid <= lls->lls_lastid)
4632                                 GOTO(unlock, rc = 0);
4633
4634                         LASSERT(lfsck->li_out_notify != NULL);
4635
4636                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4637                                              LE_LASTID_REBUILDING);
4638                         lo->ll_flags |= LF_CRASHED_LASTID;
4639                 }
4640
4641                 lls->lls_lastid = oid;
4642                 lls->lls_dirty = 1;
4643         }
4644
4645         GOTO(unlock, rc = 0);
4646
4647 unlock:
4648         up_write(&com->lc_sem);
4649
4650         return rc;
4651 }
4652
4653 static int lfsck_layout_exec_dir(const struct lu_env *env,
4654                                  struct lfsck_component *com,
4655                                  struct dt_object *obj,
4656                                  struct lu_dirent *ent)
4657 {
4658         return 0;
4659 }
4660
4661 static int lfsck_layout_master_post(const struct lu_env *env,
4662                                     struct lfsck_component *com,
4663                                     int result, bool init)
4664 {
4665         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4666         struct lfsck_layout             *lo      = com->lc_file_ram;
4667         struct lfsck_layout_master_data *llmd    = com->lc_data;
4668         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4669         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4670         struct l_wait_info               lwi     = { 0 };
4671         int                              rc;
4672         ENTRY;
4673
4674
4675         llmd->llmd_post_result = result;
4676         llmd->llmd_to_post = 1;
4677         if (llmd->llmd_post_result <= 0)
4678                 llmd->llmd_exit = 1;
4679
4680         wake_up_all(&athread->t_ctl_waitq);
4681         l_wait_event(mthread->t_ctl_waitq,
4682                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
4683                      thread_is_stopped(athread),
4684                      &lwi);
4685
4686         if (llmd->llmd_assistant_status < 0)
4687                 result = llmd->llmd_assistant_status;
4688
4689         down_write(&com->lc_sem);
4690         spin_lock(&lfsck->li_lock);
4691         /* When LFSCK failed, there may be some prefetched objects those are
4692          * not been processed yet, we do not know the exactly position, then
4693          * just restart from last check-point next time. */
4694         if (!init && !llmd->llmd_exit)
4695                 lo->ll_pos_last_checkpoint =
4696                                         lfsck->li_pos_current.lp_oit_cookie;
4697
4698         if (result > 0) {
4699                 lo->ll_status = LS_SCANNING_PHASE2;
4700                 lo->ll_flags |= LF_SCANNED_ONCE;
4701                 lo->ll_flags &= ~LF_UPGRADE;
4702                 list_del_init(&com->lc_link);
4703                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
4704         } else if (result == 0) {
4705                 lo->ll_status = lfsck->li_status;
4706                 if (lo->ll_status == 0)
4707                         lo->ll_status = LS_STOPPED;
4708                 if (lo->ll_status != LS_PAUSED) {
4709                         list_del_init(&com->lc_link);
4710                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4711                 }
4712         } else {
4713                 lo->ll_status = LS_FAILED;
4714                 list_del_init(&com->lc_link);
4715                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4716         }
4717         spin_unlock(&lfsck->li_lock);
4718
4719         if (!init) {
4720                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4721                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4722                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4723                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4724                 com->lc_new_checked = 0;
4725         }
4726
4727         rc = lfsck_layout_store(env, com);
4728         up_write(&com->lc_sem);
4729
4730         RETURN(rc);
4731 }
4732
4733 static int lfsck_layout_slave_post(const struct lu_env *env,
4734                                    struct lfsck_component *com,
4735                                    int result, bool init)
4736 {
4737         struct lfsck_instance   *lfsck = com->lc_lfsck;
4738         struct lfsck_layout     *lo    = com->lc_file_ram;
4739         int                      rc;
4740         bool                     done  = false;
4741
4742         rc = lfsck_layout_lastid_store(env, com);
4743         if (rc != 0)
4744                 result = rc;
4745
4746         LASSERT(lfsck->li_out_notify != NULL);
4747
4748         down_write(&com->lc_sem);
4749
4750         spin_lock(&lfsck->li_lock);
4751         if (!init)
4752                 lo->ll_pos_last_checkpoint =
4753                                         lfsck->li_pos_current.lp_oit_cookie;
4754         if (result > 0) {
4755                 lo->ll_status = LS_SCANNING_PHASE2;
4756                 lo->ll_flags |= LF_SCANNED_ONCE;
4757                 if (lo->ll_flags & LF_CRASHED_LASTID) {
4758                         done = true;
4759                         lo->ll_flags &= ~LF_CRASHED_LASTID;
4760                 }
4761                 lo->ll_flags &= ~LF_UPGRADE;
4762                 list_del_init(&com->lc_link);
4763                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
4764         } else if (result == 0) {
4765                 lo->ll_status = lfsck->li_status;
4766                 if (lo->ll_status == 0)
4767                         lo->ll_status = LS_STOPPED;
4768                 if (lo->ll_status != LS_PAUSED) {
4769                         list_del_init(&com->lc_link);
4770                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4771                 }
4772         } else {
4773                 lo->ll_status = LS_FAILED;
4774                 list_del_init(&com->lc_link);
4775                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4776         }
4777         spin_unlock(&lfsck->li_lock);
4778
4779         if (done)
4780                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4781                                      LE_LASTID_REBUILT);
4782
4783         if (!init) {
4784                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4785                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4786                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4787                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4788                 com->lc_new_checked = 0;
4789         }
4790
4791         rc = lfsck_layout_store(env, com);
4792
4793         up_write(&com->lc_sem);
4794
4795         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
4796
4797         if (result <= 0)
4798                 lfsck_rbtree_cleanup(env, com);
4799
4800         return rc;
4801 }
4802
4803 static int lfsck_layout_dump(const struct lu_env *env,
4804                              struct lfsck_component *com, char *buf, int len)
4805 {
4806         struct lfsck_instance   *lfsck = com->lc_lfsck;
4807         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
4808         struct lfsck_layout     *lo    = com->lc_file_ram;
4809         int                      save  = len;
4810         int                      ret   = -ENOSPC;
4811         int                      rc;
4812
4813         down_read(&com->lc_sem);
4814         rc = snprintf(buf, len,
4815                       "name: lfsck_layout\n"
4816                       "magic: %#x\n"
4817                       "version: %d\n"
4818                       "status: %s\n",
4819                       lo->ll_magic,
4820                       bk->lb_version,
4821                       lfsck_status2names(lo->ll_status));
4822         if (rc <= 0)
4823                 goto out;
4824
4825         buf += rc;
4826         len -= rc;
4827         rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
4828                              "flags");
4829         if (rc < 0)
4830                 goto out;
4831
4832         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
4833                              "param");
4834         if (rc < 0)
4835                 goto out;
4836
4837         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
4838                              "time_since_last_completed");
4839         if (rc < 0)
4840                 goto out;
4841
4842         rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
4843                              "time_since_latest_start");
4844         if (rc < 0)
4845                 goto out;
4846
4847         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
4848                              "time_since_last_checkpoint");
4849         if (rc < 0)
4850                 goto out;
4851
4852         rc = snprintf(buf, len,
4853                       "latest_start_position: "LPU64"\n"
4854                       "last_checkpoint_position: "LPU64"\n"
4855                       "first_failure_position: "LPU64"\n",
4856                       lo->ll_pos_latest_start,
4857                       lo->ll_pos_last_checkpoint,
4858                       lo->ll_pos_first_inconsistent);
4859         if (rc <= 0)
4860                 goto out;
4861
4862         buf += rc;
4863         len -= rc;
4864
4865         rc = snprintf(buf, len,
4866                       "success_count: %u\n"
4867                       "repaired_dangling: "LPU64"\n"
4868                       "repaired_unmatched_pair: "LPU64"\n"
4869                       "repaired_multiple_referenced: "LPU64"\n"
4870                       "repaired_orphan: "LPU64"\n"
4871                       "repaired_inconsistent_owner: "LPU64"\n"
4872                       "repaired_others: "LPU64"\n"
4873                       "skipped: "LPU64"\n"
4874                       "failed_phase1: "LPU64"\n"
4875                       "failed_phase2: "LPU64"\n",
4876                       lo->ll_success_count,
4877                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
4878                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
4879                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
4880                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
4881                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
4882                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
4883                       lo->ll_objs_skipped,
4884                       lo->ll_objs_failed_phase1,
4885                       lo->ll_objs_failed_phase2);
4886         if (rc <= 0)
4887                 goto out;
4888
4889         buf += rc;
4890         len -= rc;
4891
4892         if (lo->ll_status == LS_SCANNING_PHASE1) {
4893                 __u64 pos;
4894                 const struct dt_it_ops *iops;
4895                 cfs_duration_t duration = cfs_time_current() -
4896                                           lfsck->li_time_last_checkpoint;
4897                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
4898                 __u64 speed = checked;
4899                 __u64 new_checked = com->lc_new_checked * HZ;
4900                 __u32 rtime = lo->ll_run_time_phase1 +
4901                               cfs_duration_sec(duration + HALF_SEC);
4902
4903                 if (duration != 0)
4904                         do_div(new_checked, duration);
4905                 if (rtime != 0)
4906                         do_div(speed, rtime);
4907                 rc = snprintf(buf, len,
4908                               "checked_phase1: "LPU64"\n"
4909                               "checked_phase2: "LPU64"\n"
4910                               "run_time_phase1: %u seconds\n"
4911                               "run_time_phase2: %u seconds\n"
4912                               "average_speed_phase1: "LPU64" items/sec\n"
4913                               "average_speed_phase2: N/A\n"
4914                               "real-time_speed_phase1: "LPU64" items/sec\n"
4915                               "real-time_speed_phase2: N/A\n",
4916                               checked,
4917                               lo->ll_objs_checked_phase2,
4918                               rtime,
4919                               lo->ll_run_time_phase2,
4920                               speed,
4921                               new_checked);
4922                 if (rc <= 0)
4923                         goto out;
4924
4925                 buf += rc;
4926                 len -= rc;
4927
4928                 LASSERT(lfsck->li_di_oit != NULL);
4929
4930                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4931
4932                 /* The low layer otable-based iteration position may NOT
4933                  * exactly match the layout-based directory traversal
4934                  * cookie. Generally, it is not a serious issue. But the
4935                  * caller should NOT make assumption on that. */
4936                 pos = iops->store(env, lfsck->li_di_oit);
4937                 if (!lfsck->li_current_oit_processed)
4938                         pos--;
4939                 rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
4940                 if (rc <= 0)
4941                         goto out;
4942
4943                 buf += rc;
4944                 len -= rc;
4945         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
4946                 cfs_duration_t duration = cfs_time_current() -
4947                                           lfsck->li_time_last_checkpoint;
4948                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
4949                 __u64 speed = checked;
4950                 __u64 new_checked = com->lc_new_checked * HZ;
4951                 __u32 rtime = lo->ll_run_time_phase1 +
4952                               cfs_duration_sec(duration + HALF_SEC);
4953
4954                 if (duration != 0)
4955                         do_div(new_checked, duration);
4956                 if (rtime != 0)
4957                         do_div(speed, rtime);
4958                 rc = snprintf(buf, len,
4959                               "checked_phase1: "LPU64"\n"
4960                               "checked_phase2: "LPU64"\n"
4961                               "run_time_phase1: %u seconds\n"
4962                               "run_time_phase2: %u seconds\n"
4963                               "average_speed_phase1: "LPU64" items/sec\n"
4964                               "average_speed_phase2: N/A\n"
4965                               "real-time_speed_phase1: "LPU64" items/sec\n"
4966                               "real-time_speed_phase2: N/A\n"
4967                               "current_position: "DFID"\n",
4968                               checked,
4969                               lo->ll_objs_checked_phase2,
4970                               rtime,
4971                               lo->ll_run_time_phase2,
4972                               speed,
4973                               new_checked,
4974                               PFID(&com->lc_fid_latest_scanned_phase2));
4975                 if (rc <= 0)
4976                         goto out;
4977
4978                 buf += rc;
4979                 len -= rc;
4980         } else {
4981                 __u64 speed1 = lo->ll_objs_checked_phase1;
4982                 __u64 speed2 = lo->ll_objs_checked_phase2;
4983
4984                 if (lo->ll_run_time_phase1 != 0)
4985                         do_div(speed1, lo->ll_run_time_phase1);
4986                 if (lo->ll_run_time_phase2 != 0)
4987                         do_div(speed2, lo->ll_run_time_phase2);
4988                 rc = snprintf(buf, len,
4989                               "checked_phase1: "LPU64"\n"
4990                               "checked_phase2: "LPU64"\n"
4991                               "run_time_phase1: %u seconds\n"
4992                               "run_time_phase2: %u seconds\n"
4993                               "average_speed_phase1: "LPU64" items/sec\n"
4994                               "average_speed_phase2: "LPU64" objs/sec\n"
4995                               "real-time_speed_phase1: N/A\n"
4996                               "real-time_speed_phase2: N/A\n"
4997                               "current_position: N/A\n",
4998                               lo->ll_objs_checked_phase1,
4999                               lo->ll_objs_checked_phase2,
5000                               lo->ll_run_time_phase1,
5001                               lo->ll_run_time_phase2,
5002                               speed1,
5003                               speed2);
5004                 if (rc <= 0)
5005                         goto out;
5006
5007                 buf += rc;
5008                 len -= rc;
5009         }
5010         ret = save - len;
5011
5012 out:
5013         up_read(&com->lc_sem);
5014
5015         return ret;
5016 }
5017
5018 static int lfsck_layout_master_double_scan(const struct lu_env *env,
5019                                            struct lfsck_component *com)
5020 {
5021         struct lfsck_layout_master_data *llmd    = com->lc_data;
5022         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5023         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5024         struct lfsck_layout             *lo      = com->lc_file_ram;
5025         struct l_wait_info               lwi     = { 0 };
5026
5027         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
5028                 return 0;
5029
5030         llmd->llmd_to_double_scan = 1;
5031         wake_up_all(&athread->t_ctl_waitq);
5032         l_wait_event(mthread->t_ctl_waitq,
5033                      llmd->llmd_in_double_scan ||
5034                      thread_is_stopped(athread),
5035                      &lwi);
5036         if (llmd->llmd_assistant_status < 0)
5037                 return llmd->llmd_assistant_status;
5038
5039         return 0;
5040 }
5041
5042 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
5043                                           struct lfsck_component *com)
5044 {
5045         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5046         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5047         struct lfsck_layout             *lo     = com->lc_file_ram;
5048         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5049         int                              rc;
5050         ENTRY;
5051
5052         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
5053                 lfsck_rbtree_cleanup(env, com);
5054                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
5055                 RETURN(0);
5056         }
5057
5058         atomic_inc(&lfsck->li_double_scan_count);
5059
5060         com->lc_new_checked = 0;
5061         com->lc_new_scanned = 0;
5062         com->lc_time_last_checkpoint = cfs_time_current();
5063         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
5064                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5065
5066         while (1) {
5067                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
5068                                                      NULL, NULL);
5069
5070                 rc = lfsck_layout_slave_query_master(env, com);
5071                 if (list_empty(&llsd->llsd_master_list)) {
5072                         if (unlikely(!thread_is_running(thread)))
5073                                 rc = 0;
5074                         else
5075                                 rc = 1;
5076
5077                         GOTO(done, rc);
5078                 }
5079
5080                 if (rc < 0)
5081                         GOTO(done, rc);
5082
5083                 rc = l_wait_event(thread->t_ctl_waitq,
5084                                   !thread_is_running(thread) ||
5085                                   list_empty(&llsd->llsd_master_list),
5086                                   &lwi);
5087                 if (unlikely(!thread_is_running(thread)))
5088                         GOTO(done, rc = 0);
5089
5090                 if (rc == -ETIMEDOUT)
5091                         continue;
5092
5093                 GOTO(done, rc = (rc < 0 ? rc : 1));
5094         }
5095
5096 done:
5097         rc = lfsck_layout_double_scan_result(env, com, rc);
5098
5099         lfsck_rbtree_cleanup(env, com);
5100         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
5101         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5102                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5103
5104         return rc;
5105 }
5106
5107 static void lfsck_layout_master_data_release(const struct lu_env *env,
5108                                              struct lfsck_component *com)
5109 {
5110         struct lfsck_layout_master_data *llmd   = com->lc_data;
5111         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5112         struct lfsck_tgt_descs          *ltds;
5113         struct lfsck_tgt_desc           *ltd;
5114         struct lfsck_tgt_desc           *next;
5115
5116         LASSERT(llmd != NULL);
5117         LASSERT(thread_is_init(&llmd->llmd_thread) ||
5118                 thread_is_stopped(&llmd->llmd_thread));
5119         LASSERT(list_empty(&llmd->llmd_req_list));
5120
5121         com->lc_data = NULL;
5122
5123         ltds = &lfsck->li_ost_descs;
5124         spin_lock(&ltds->ltd_lock);
5125         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
5126                                  ltd_layout_phase_list) {
5127                 list_del_init(&ltd->ltd_layout_phase_list);
5128         }
5129         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
5130                                  ltd_layout_phase_list) {
5131                 list_del_init(&ltd->ltd_layout_phase_list);
5132         }
5133         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
5134                                  ltd_layout_list) {
5135                 list_del_init(&ltd->ltd_layout_list);
5136         }
5137         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
5138                                  ltd_layout_phase_list) {
5139                 list_del_init(&ltd->ltd_layout_phase_list);
5140         }
5141         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
5142                                  ltd_layout_phase_list) {
5143                 list_del_init(&ltd->ltd_layout_phase_list);
5144         }
5145         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
5146                                  ltd_layout_list) {
5147                 list_del_init(&ltd->ltd_layout_list);
5148         }
5149         spin_unlock(&ltds->ltd_lock);
5150
5151         OBD_FREE_PTR(llmd);
5152 }
5153
5154 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5155                                             struct lfsck_component *com)
5156 {
5157         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5158         struct lfsck_layout_seq          *lls;
5159         struct lfsck_layout_seq          *next;
5160         struct lfsck_layout_slave_target *llst;
5161         struct lfsck_layout_slave_target *tmp;
5162
5163         LASSERT(llsd != NULL);
5164
5165         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5166                                      lls_list) {
5167                 list_del_init(&lls->lls_list);
5168                 lfsck_object_put(env, lls->lls_lastid_obj);
5169                 OBD_FREE_PTR(lls);
5170         }
5171
5172         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
5173                                  llst_list) {
5174                 list_del_init(&llst->llst_list);
5175                 OBD_FREE_PTR(llst);
5176         }
5177
5178         lfsck_rbtree_cleanup(env, com);
5179         com->lc_data = NULL;
5180         OBD_FREE_PTR(llsd);
5181 }
5182
5183 static void lfsck_layout_master_quit(const struct lu_env *env,
5184                                      struct lfsck_component *com)
5185 {
5186         struct lfsck_layout_master_data *llmd    = com->lc_data;
5187         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5188         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5189         struct l_wait_info               lwi     = { 0 };
5190
5191         llmd->llmd_exit = 1;
5192         wake_up_all(&athread->t_ctl_waitq);
5193         l_wait_event(mthread->t_ctl_waitq,
5194                      thread_is_init(athread) ||
5195                      thread_is_stopped(athread),
5196                      &lwi);
5197 }
5198
5199 static void lfsck_layout_slave_quit(const struct lu_env *env,
5200                                     struct lfsck_component *com)
5201 {
5202         lfsck_rbtree_cleanup(env, com);
5203 }
5204
5205 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5206                                          struct lfsck_component *com,
5207                                          struct lfsck_request *lr)
5208 {
5209         struct lfsck_instance           *lfsck = com->lc_lfsck;
5210         struct lfsck_layout             *lo    = com->lc_file_ram;
5211         struct lfsck_layout_master_data *llmd  = com->lc_data;
5212         struct lfsck_tgt_descs          *ltds;
5213         struct lfsck_tgt_desc           *ltd;
5214         bool                             fail  = false;
5215         ENTRY;
5216
5217         if (lr->lr_event == LE_PAIRS_VERIFY) {
5218                 int rc;
5219
5220                 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5221                                                      &lr->lr_fid2);
5222
5223                 RETURN(rc);
5224         }
5225
5226         if (lr->lr_event != LE_PHASE1_DONE &&
5227             lr->lr_event != LE_PHASE2_DONE &&
5228             lr->lr_event != LE_PEER_EXIT)
5229                 RETURN(-EINVAL);
5230
5231         if (lr->lr_flags & LEF_FROM_OST)
5232                 ltds = &lfsck->li_ost_descs;
5233         else
5234                 ltds = &lfsck->li_mdt_descs;
5235         spin_lock(&ltds->ltd_lock);
5236         ltd = LTD_TGT(ltds, lr->lr_index);
5237         if (ltd == NULL) {
5238                 spin_unlock(&ltds->ltd_lock);
5239
5240                 RETURN(-ENODEV);
5241         }
5242
5243         list_del_init(&ltd->ltd_layout_phase_list);
5244         switch (lr->lr_event) {
5245         case LE_PHASE1_DONE:
5246                 if (lr->lr_status <= 0) {
5247                         ltd->ltd_layout_done = 1;
5248                         list_del_init(&ltd->ltd_layout_list);
5249                         CWARN("%s: %s %x failed/stopped at phase1: rc = %d.\n",
5250                               lfsck_lfsck2name(lfsck),
5251                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5252                               ltd->ltd_index, lr->lr_status);
5253                         lo->ll_flags |= LF_INCOMPLETE;
5254                         fail = true;
5255                         break;
5256                 }
5257
5258                 if (lr->lr_flags & LEF_FROM_OST) {
5259                         if (list_empty(&ltd->ltd_layout_list))
5260                                 list_add_tail(&ltd->ltd_layout_list,
5261                                               &llmd->llmd_ost_list);
5262                         list_add_tail(&ltd->ltd_layout_phase_list,
5263                                       &llmd->llmd_ost_phase2_list);
5264                 } else {
5265                         if (list_empty(&ltd->ltd_layout_list))
5266                                 list_add_tail(&ltd->ltd_layout_list,
5267                                               &llmd->llmd_mdt_list);
5268                         list_add_tail(&ltd->ltd_layout_phase_list,
5269                                       &llmd->llmd_mdt_phase2_list);
5270                 }
5271                 break;
5272         case LE_PHASE2_DONE:
5273                 ltd->ltd_layout_done = 1;
5274                 list_del_init(&ltd->ltd_layout_list);
5275                 break;
5276         case LE_PEER_EXIT:
5277                 fail = true;
5278                 ltd->ltd_layout_done = 1;
5279                 list_del_init(&ltd->ltd_layout_list);
5280                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
5281                         CWARN("%s: the peer %s %x exit layout LFSCK.\n",
5282                               lfsck_lfsck2name(lfsck),
5283                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5284                               ltd->ltd_index);
5285                         lo->ll_flags |= LF_INCOMPLETE;
5286                 }
5287                 break;
5288         default:
5289                 break;
5290         }
5291         spin_unlock(&ltds->ltd_lock);
5292
5293         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5294                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5295
5296                 memset(stop, 0, sizeof(*stop));
5297                 stop->ls_status = lr->lr_status;
5298                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5299                 lfsck_stop(env, lfsck->li_bottom, stop);
5300         } else if (lfsck_layout_master_to_orphan(llmd)) {
5301                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
5302         }
5303
5304         RETURN(0);
5305 }
5306
5307 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5308                                         struct lfsck_component *com,
5309                                         struct lfsck_request *lr)
5310 {
5311         struct lfsck_instance            *lfsck = com->lc_lfsck;
5312         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5313         struct lfsck_layout_slave_target *llst;
5314         int                               rc;
5315         ENTRY;
5316
5317         switch (lr->lr_event) {
5318         case LE_FID_ACCESSED:
5319                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5320                 RETURN(0);
5321         case LE_CONDITIONAL_DESTROY:
5322                 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5323                 RETURN(rc);
5324         case LE_PAIRS_VERIFY: {
5325                 lr->lr_status = LPVS_INIT;
5326                 /* Firstly, if the MDT-object which is claimed via OST-object
5327                  * local stored PFID xattr recognizes the OST-object, then it
5328                  * must be that the client given PFID is wrong. */
5329                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5330                                                     &lr->lr_fid3);
5331                 if (rc <= 0)
5332                         RETURN(0);
5333
5334                 lr->lr_status = LPVS_INCONSISTENT;
5335                 /* The OST-object local stored PFID xattr is stale. We need to
5336                  * check whether the MDT-object that is claimed via the client
5337                  * given PFID information recognizes the OST-object or not. If
5338                  * matches, then need to update the OST-object's PFID xattr. */
5339                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5340                                                     &lr->lr_fid2);
5341                 /* For rc < 0 case:
5342                  * We are not sure whether the client given PFID information
5343                  * is correct or not, do nothing to avoid improper fixing.
5344                  *
5345                  * For rc > 0 case:
5346                  * The client given PFID information is also invalid, we can
5347                  * NOT fix the OST-object inconsistency.
5348                  */
5349                 if (rc != 0)
5350                         RETURN(rc);
5351
5352                 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5353                 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5354
5355                 RETURN(rc);
5356         }
5357         case LE_PHASE2_DONE:
5358         case LE_PEER_EXIT:
5359                 break;
5360         default:
5361                 RETURN(-EINVAL);
5362         }
5363
5364         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5365         if (llst == NULL)
5366                 RETURN(-ENODEV);
5367
5368         lfsck_layout_llst_put(llst);
5369         if (list_empty(&llsd->llsd_master_list))
5370                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5371
5372         if (lr->lr_event == LE_PEER_EXIT &&
5373             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5374                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5375
5376                 memset(stop, 0, sizeof(*stop));
5377                 stop->ls_status = lr->lr_status;
5378                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5379                 lfsck_stop(env, lfsck->li_bottom, stop);
5380         }
5381
5382         RETURN(0);
5383 }
5384
5385 static int lfsck_layout_query(const struct lu_env *env,
5386                               struct lfsck_component *com)
5387 {
5388         struct lfsck_layout *lo = com->lc_file_ram;
5389
5390         return lo->ll_status;
5391 }
5392
5393 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
5394                                            struct lfsck_component *com,
5395                                            struct lfsck_tgt_descs *ltds,
5396                                            struct lfsck_tgt_desc *ltd,
5397                                            struct ptlrpc_request_set *set)
5398 {
5399         struct lfsck_thread_info          *info  = lfsck_env_info(env);
5400         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
5401         struct lfsck_request              *lr    = &info->lti_lr;
5402         struct lfsck_instance             *lfsck = com->lc_lfsck;
5403         int                                rc;
5404
5405         spin_lock(&ltds->ltd_lock);
5406         if (list_empty(&ltd->ltd_layout_list)) {
5407                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
5408                 spin_unlock(&ltds->ltd_lock);
5409
5410                 return 0;
5411         }
5412
5413         list_del_init(&ltd->ltd_layout_phase_list);
5414         list_del_init(&ltd->ltd_layout_list);
5415         spin_unlock(&ltds->ltd_lock);
5416
5417         memset(lr, 0, sizeof(*lr));
5418         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
5419         lr->lr_event = LE_PEER_EXIT;
5420         lr->lr_active = LT_LAYOUT;
5421         lr->lr_status = LS_CO_PAUSED;
5422         if (ltds == &lfsck->li_ost_descs)
5423                 lr->lr_flags = LEF_TO_OST;
5424
5425         laia->laia_com = com;
5426         laia->laia_ltds = ltds;
5427         atomic_inc(&ltd->ltd_ref);
5428         laia->laia_ltd = ltd;
5429         laia->laia_lr = lr;
5430         laia->laia_shared = 0;
5431
5432         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
5433                                  lfsck_layout_master_async_interpret,
5434                                  laia, LFSCK_NOTIFY);
5435         if (rc != 0) {
5436                 CERROR("%s: Fail to notify %s %x for co-stop: rc = %d\n",
5437                        lfsck_lfsck2name(lfsck),
5438                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5439                        ltd->ltd_index, rc);
5440                 lfsck_tgt_put(ltd);
5441         }
5442
5443         return rc;
5444 }
5445
5446 /* with lfsck::li_lock held */
5447 static int lfsck_layout_slave_join(const struct lu_env *env,
5448                                    struct lfsck_component *com,
5449                                    struct lfsck_start_param *lsp)
5450 {
5451         struct lfsck_instance            *lfsck = com->lc_lfsck;
5452         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5453         struct lfsck_layout_slave_target *llst;
5454         struct lfsck_start               *start = lsp->lsp_start;
5455         int                               rc    = 0;
5456         ENTRY;
5457
5458         if (!lsp->lsp_index_valid || start == NULL ||
5459             !(start->ls_flags & LPF_ALL_TGT) ||
5460             !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT))
5461                 RETURN(-EALREADY);
5462
5463         spin_unlock(&lfsck->li_lock);
5464         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5465         spin_lock(&lfsck->li_lock);
5466         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5467                 spin_unlock(&lfsck->li_lock);
5468                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5469                                                       true);
5470                 if (llst != NULL)
5471                         lfsck_layout_llst_put(llst);
5472                 spin_lock(&lfsck->li_lock);
5473                 rc = -EAGAIN;
5474         }
5475
5476         RETURN(rc);
5477 }
5478
5479 static struct lfsck_operations lfsck_layout_master_ops = {
5480         .lfsck_reset            = lfsck_layout_reset,
5481         .lfsck_fail             = lfsck_layout_fail,
5482         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
5483         .lfsck_prep             = lfsck_layout_master_prep,
5484         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
5485         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5486         .lfsck_post             = lfsck_layout_master_post,
5487         .lfsck_interpret        = lfsck_layout_master_async_interpret,
5488         .lfsck_dump             = lfsck_layout_dump,
5489         .lfsck_double_scan      = lfsck_layout_master_double_scan,
5490         .lfsck_data_release     = lfsck_layout_master_data_release,
5491         .lfsck_quit             = lfsck_layout_master_quit,
5492         .lfsck_in_notify        = lfsck_layout_master_in_notify,
5493         .lfsck_query            = lfsck_layout_query,
5494         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
5495 };
5496
5497 static struct lfsck_operations lfsck_layout_slave_ops = {
5498         .lfsck_reset            = lfsck_layout_reset,
5499         .lfsck_fail             = lfsck_layout_fail,
5500         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
5501         .lfsck_prep             = lfsck_layout_slave_prep,
5502         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
5503         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5504         .lfsck_post             = lfsck_layout_slave_post,
5505         .lfsck_dump             = lfsck_layout_dump,
5506         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
5507         .lfsck_data_release     = lfsck_layout_slave_data_release,
5508         .lfsck_quit             = lfsck_layout_slave_quit,
5509         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
5510         .lfsck_query            = lfsck_layout_query,
5511         .lfsck_join             = lfsck_layout_slave_join,
5512 };
5513
5514 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5515 {
5516         struct lfsck_component  *com;
5517         struct lfsck_layout     *lo;
5518         struct dt_object        *root = NULL;
5519         struct dt_object        *obj;
5520         int                      rc;
5521         ENTRY;
5522
5523         OBD_ALLOC_PTR(com);
5524         if (com == NULL)
5525                 RETURN(-ENOMEM);
5526
5527         INIT_LIST_HEAD(&com->lc_link);
5528         INIT_LIST_HEAD(&com->lc_link_dir);
5529         init_rwsem(&com->lc_sem);
5530         atomic_set(&com->lc_ref, 1);
5531         com->lc_lfsck = lfsck;
5532         com->lc_type = LT_LAYOUT;
5533         if (lfsck->li_master) {
5534                 struct lfsck_layout_master_data *llmd;
5535
5536                 com->lc_ops = &lfsck_layout_master_ops;
5537                 OBD_ALLOC_PTR(llmd);
5538                 if (llmd == NULL)
5539                         GOTO(out, rc = -ENOMEM);
5540
5541                 INIT_LIST_HEAD(&llmd->llmd_req_list);
5542                 spin_lock_init(&llmd->llmd_lock);
5543                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
5544                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
5545                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
5546                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
5547                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
5548                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
5549                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
5550                 com->lc_data = llmd;
5551         } else {
5552                 struct lfsck_layout_slave_data *llsd;
5553
5554                 com->lc_ops = &lfsck_layout_slave_ops;
5555                 OBD_ALLOC_PTR(llsd);
5556                 if (llsd == NULL)
5557                         GOTO(out, rc = -ENOMEM);
5558
5559                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5560                 INIT_LIST_HEAD(&llsd->llsd_master_list);
5561                 spin_lock_init(&llsd->llsd_lock);
5562                 llsd->llsd_rb_root = RB_ROOT;
5563                 rwlock_init(&llsd->llsd_rb_lock);
5564                 com->lc_data = llsd;
5565         }
5566         com->lc_file_size = sizeof(*lo);
5567         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5568         if (com->lc_file_ram == NULL)
5569                 GOTO(out, rc = -ENOMEM);
5570
5571         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5572         if (com->lc_file_disk == NULL)
5573                 GOTO(out, rc = -ENOMEM);
5574
5575         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5576         if (IS_ERR(root))
5577                 GOTO(out, rc = PTR_ERR(root));
5578
5579         if (unlikely(!dt_try_as_dir(env, root)))
5580                 GOTO(out, rc = -ENOTDIR);
5581
5582         obj = local_file_find_or_create(env, lfsck->li_los, root,
5583                                         lfsck_layout_name,
5584                                         S_IFREG | S_IRUGO | S_IWUSR);
5585         if (IS_ERR(obj))
5586                 GOTO(out, rc = PTR_ERR(obj));
5587
5588         com->lc_obj = obj;
5589         rc = lfsck_layout_load(env, com);
5590         if (rc > 0)
5591                 rc = lfsck_layout_reset(env, com, true);
5592         else if (rc == -ENOENT)
5593                 rc = lfsck_layout_init(env, com);
5594
5595         if (rc != 0)
5596                 GOTO(out, rc);
5597
5598         lo = com->lc_file_ram;
5599         switch (lo->ll_status) {
5600         case LS_INIT:
5601         case LS_COMPLETED:
5602         case LS_FAILED:
5603         case LS_STOPPED:
5604         case LS_PARTIAL:
5605                 spin_lock(&lfsck->li_lock);
5606                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5607                 spin_unlock(&lfsck->li_lock);
5608                 break;
5609         default:
5610                 CERROR("%s: unknown lfsck_layout status: rc = %u\n",
5611                        lfsck_lfsck2name(lfsck), lo->ll_status);
5612                 /* fall through */
5613         case LS_SCANNING_PHASE1:
5614         case LS_SCANNING_PHASE2:
5615                 /* No need to store the status to disk right now.
5616                  * If the system crashed before the status stored,
5617                  * it will be loaded back when next time. */
5618                 lo->ll_status = LS_CRASHED;
5619                 lo->ll_flags |= LF_INCOMPLETE;
5620                 /* fall through */
5621         case LS_PAUSED:
5622         case LS_CRASHED:
5623         case LS_CO_FAILED:
5624         case LS_CO_STOPPED:
5625         case LS_CO_PAUSED:
5626                 spin_lock(&lfsck->li_lock);
5627                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5628                 spin_unlock(&lfsck->li_lock);
5629                 break;
5630         }
5631
5632         if (lo->ll_flags & LF_CRASHED_LASTID) {
5633                 LASSERT(lfsck->li_out_notify != NULL);
5634
5635                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5636                                      LE_LASTID_REBUILDING);
5637         }
5638
5639         GOTO(out, rc = 0);
5640
5641 out:
5642         if (root != NULL && !IS_ERR(root))
5643                 lu_object_put(env, &root->do_lu);
5644
5645         if (rc != 0)
5646                 lfsck_component_cleanup(env, com);
5647
5648         return rc;
5649 }
5650
5651 struct lfsck_orphan_it {
5652         struct lfsck_component           *loi_com;
5653         struct lfsck_rbtree_node         *loi_lrn;
5654         struct lfsck_layout_slave_target *loi_llst;
5655         struct lu_fid                     loi_key;
5656         struct lu_orphan_rec              loi_rec;
5657         __u64                             loi_hash;
5658         unsigned int                      loi_over:1;
5659 };
5660
5661 static int lfsck_fid_match_idx(const struct lu_env *env,
5662                                struct lfsck_instance *lfsck,
5663                                const struct lu_fid *fid, int idx)
5664 {
5665         struct seq_server_site  *ss;
5666         struct lu_server_fld    *sf;
5667         struct lu_seq_range      range  = { 0 };
5668         int                      rc;
5669
5670         /* All abnormal cases will be returned to MDT0. */
5671         if (!fid_is_norm(fid)) {
5672                 if (idx == 0)
5673                         return 1;
5674
5675                 return 0;
5676         }
5677
5678         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
5679         if (unlikely(ss == NULL))
5680                 return -ENOTCONN;
5681
5682         sf = ss->ss_server_fld;
5683         LASSERT(sf != NULL);
5684
5685         fld_range_set_any(&range);
5686         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
5687         if (rc != 0)
5688                 return rc;
5689
5690         if (!fld_range_is_mdt(&range))
5691                 return -EINVAL;
5692
5693         if (range.lsr_index == idx)
5694                 return 1;
5695
5696         return 0;
5697 }
5698
5699 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
5700                                         struct dt_device *dev,
5701                                         struct dt_object *obj)
5702 {
5703         struct thandle *handle;
5704         int             rc;
5705         ENTRY;
5706
5707         handle = dt_trans_create(env, dev);
5708         if (IS_ERR(handle))
5709                 RETURN_EXIT;
5710
5711         rc = dt_declare_ref_del(env, obj, handle);
5712         if (rc != 0)
5713                 GOTO(stop, rc);
5714
5715         rc = dt_declare_destroy(env, obj, handle);
5716         if (rc != 0)
5717                 GOTO(stop, rc);
5718
5719         rc = dt_trans_start_local(env, dev, handle);
5720         if (rc != 0)
5721                 GOTO(stop, rc);
5722
5723         dt_write_lock(env, obj, 0);
5724         rc = dt_ref_del(env, obj, handle);
5725         if (rc == 0)
5726                 rc = dt_destroy(env, obj, handle);
5727         dt_write_unlock(env, obj);
5728
5729         GOTO(stop, rc);
5730
5731 stop:
5732         dt_trans_stop(env, dev, handle);
5733
5734         RETURN_EXIT;
5735 }
5736
5737 static int lfsck_orphan_index_lookup(const struct lu_env *env,
5738                                      struct dt_object *dt,
5739                                      struct dt_rec *rec,
5740                                      const struct dt_key *key,
5741                                      struct lustre_capa *capa)
5742 {
5743         return -EOPNOTSUPP;
5744 }
5745
5746 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
5747                                              struct dt_object *dt,
5748                                              const struct dt_rec *rec,
5749                                              const struct dt_key *key,
5750                                              struct thandle *handle)
5751 {
5752         return -EOPNOTSUPP;
5753 }
5754
5755 static int lfsck_orphan_index_insert(const struct lu_env *env,
5756                                      struct dt_object *dt,
5757                                      const struct dt_rec *rec,
5758                                      const struct dt_key *key,
5759                                      struct thandle *handle,
5760                                      struct lustre_capa *capa,
5761                                      int ignore_quota)
5762 {
5763         return -EOPNOTSUPP;
5764 }
5765
5766 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
5767                                              struct dt_object *dt,
5768                                              const struct dt_key *key,
5769                                              struct thandle *handle)
5770 {
5771         return -EOPNOTSUPP;
5772 }
5773
5774 static int lfsck_orphan_index_delete(const struct lu_env *env,
5775                                      struct dt_object *dt,
5776                                      const struct dt_key *key,
5777                                      struct thandle *handle,
5778                                      struct lustre_capa *capa)
5779 {
5780         return -EOPNOTSUPP;
5781 }
5782
5783 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
5784                                           struct dt_object *dt,
5785                                           __u32 attr,
5786                                           struct lustre_capa *capa)
5787 {
5788         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
5789         struct lfsck_instance           *lfsck;
5790         struct lfsck_component          *com    = NULL;
5791         struct lfsck_layout_slave_data  *llsd;
5792         struct lfsck_orphan_it          *it     = NULL;
5793         int                              rc     = 0;
5794         ENTRY;
5795
5796         lfsck = lfsck_instance_find(dev, true, false);
5797         if (unlikely(lfsck == NULL))
5798                 RETURN(ERR_PTR(-ENODEV));
5799
5800         com = lfsck_component_find(lfsck, LT_LAYOUT);
5801         if (unlikely(com == NULL))
5802                 GOTO(out, rc = -ENOENT);
5803
5804         llsd = com->lc_data;
5805         if (!llsd->llsd_rbtree_valid)
5806                 GOTO(out, rc = -ESRCH);
5807
5808         OBD_ALLOC_PTR(it);
5809         if (it == NULL)
5810                 GOTO(out, rc = -ENOMEM);
5811
5812         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
5813         if (it->loi_llst == NULL)
5814                 GOTO(out, rc = -ENODEV);
5815
5816         if (dev->dd_record_fid_accessed) {
5817                 /* The first iteration against the rbtree, scan the whole rbtree
5818                  * to remove the nodes which do NOT need to be handled. */
5819                 write_lock(&llsd->llsd_rb_lock);
5820                 if (dev->dd_record_fid_accessed) {
5821                         struct rb_node                  *node;
5822                         struct rb_node                  *next;
5823                         struct lfsck_rbtree_node        *lrn;
5824
5825                         /* No need to record the fid accessing anymore. */
5826                         dev->dd_record_fid_accessed = 0;
5827
5828                         node = rb_first(&llsd->llsd_rb_root);
5829                         while (node != NULL) {
5830                                 next = rb_next(node);
5831                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
5832                                                lrn_node);
5833                                 if (atomic_read(&lrn->lrn_known_count) <=
5834                                     atomic_read(&lrn->lrn_accessed_count)) {
5835                                         rb_erase(node, &llsd->llsd_rb_root);
5836                                         lfsck_rbtree_free(lrn);
5837                                 }
5838                                 node = next;
5839                         }
5840                 }
5841                 write_unlock(&llsd->llsd_rb_lock);
5842         }
5843
5844         /* read lock the rbtree when init, and unlock when fini */
5845         read_lock(&llsd->llsd_rb_lock);
5846         it->loi_com = com;
5847         com = NULL;
5848
5849         GOTO(out, rc = 0);
5850
5851 out:
5852         if (com != NULL)
5853                 lfsck_component_put(env, com);
5854         lfsck_instance_put(env, lfsck);
5855         if (rc != 0) {
5856                 if (it != NULL)
5857                         OBD_FREE_PTR(it);
5858
5859                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
5860         }
5861
5862         return (struct dt_it *)it;
5863 }
5864
5865 static void lfsck_orphan_it_fini(const struct lu_env *env,
5866                                  struct dt_it *di)
5867 {
5868         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
5869         struct lfsck_component           *com   = it->loi_com;
5870         struct lfsck_layout_slave_data   *llsd;
5871         struct lfsck_layout_slave_target *llst;
5872
5873         if (com != NULL) {
5874                 llsd = com->lc_data;
5875                 read_unlock(&llsd->llsd_rb_lock);
5876                 llst = it->loi_llst;
5877                 LASSERT(llst != NULL);
5878
5879                 /* Save the key and hash for iterate next. */
5880                 llst->llst_fid = it->loi_key;
5881                 llst->llst_hash = it->loi_hash;
5882                 lfsck_layout_llst_put(llst);
5883                 lfsck_component_put(env, com);
5884         }
5885         OBD_FREE_PTR(it);
5886 }
5887
5888 /**
5889  * \retval       +1: the iteration finished
5890  * \retval        0: on success, not finished
5891  * \retval      -ve: on error
5892  */
5893 static int lfsck_orphan_it_next(const struct lu_env *env,
5894                                 struct dt_it *di)
5895 {
5896         struct lfsck_thread_info        *info   = lfsck_env_info(env);
5897         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
5898         struct lu_attr                  *la     = &info->lti_la;
5899         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
5900         struct lu_fid                   *key    = &it->loi_key;
5901         struct lu_orphan_rec            *rec    = &it->loi_rec;
5902         struct lfsck_component          *com    = it->loi_com;
5903         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5904         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5905         struct dt_object                *obj;
5906         struct lfsck_rbtree_node        *lrn;
5907         int                              pos;
5908         int                              rc;
5909         __u32                            save;
5910         __u32                            idx    = it->loi_llst->llst_index;
5911         bool                             exact  = false;
5912         ENTRY;
5913
5914         if (it->loi_over)
5915                 RETURN(1);
5916
5917 again0:
5918         lrn = it->loi_lrn;
5919         if (lrn == NULL) {
5920                 lrn = lfsck_rbtree_search(llsd, key, &exact);
5921                 if (lrn == NULL) {
5922                         it->loi_over = 1;
5923                         RETURN(1);
5924                 }
5925
5926                 it->loi_lrn = lrn;
5927                 if (!exact) {
5928                         key->f_seq = lrn->lrn_seq;
5929                         key->f_oid = lrn->lrn_first_oid;
5930                         key->f_ver = 0;
5931                 }
5932         } else {
5933                 key->f_oid++;
5934                 if (unlikely(key->f_oid == 0)) {
5935                         key->f_seq++;
5936                         it->loi_lrn = NULL;
5937                         goto again0;
5938                 }
5939
5940                 if (key->f_oid >=
5941                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
5942                         it->loi_lrn = NULL;
5943                         goto again0;
5944                 }
5945         }
5946
5947         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
5948                      atomic_read(&lrn->lrn_accessed_count))) {
5949                 struct rb_node *next = rb_next(&lrn->lrn_node);
5950
5951                 while (next != NULL) {
5952                         lrn = rb_entry(next, struct lfsck_rbtree_node,
5953                                        lrn_node);
5954                         if (atomic_read(&lrn->lrn_known_count) >
5955                             atomic_read(&lrn->lrn_accessed_count))
5956                                 break;
5957                         next = rb_next(next);
5958                 }
5959
5960                 if (next == NULL) {
5961                         it->loi_over = 1;
5962                         RETURN(1);
5963                 }
5964
5965                 it->loi_lrn = lrn;
5966                 key->f_seq = lrn->lrn_seq;
5967                 key->f_oid = lrn->lrn_first_oid;
5968                 key->f_ver = 0;
5969         }
5970
5971         pos = key->f_oid - lrn->lrn_first_oid;
5972
5973 again1:
5974         pos = find_next_bit(lrn->lrn_known_bitmap,
5975                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
5976         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
5977                 key->f_oid = lrn->lrn_first_oid + pos;
5978                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
5979                         key->f_seq++;
5980                         key->f_oid = 0;
5981                 }
5982                 it->loi_lrn = NULL;
5983                 goto again0;
5984         }
5985
5986         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
5987                 pos++;
5988                 goto again1;
5989         }
5990
5991         key->f_oid = lrn->lrn_first_oid + pos;
5992         obj = lfsck_object_find(env, lfsck, key);
5993         if (IS_ERR(obj)) {
5994                 rc = PTR_ERR(obj);
5995                 if (rc == -ENOENT) {
5996                         pos++;
5997                         goto again1;
5998                 }
5999                 RETURN(rc);
6000         }
6001
6002         dt_read_lock(env, obj, 0);
6003         if (!dt_object_exists(obj)) {
6004                 dt_read_unlock(env, obj);
6005                 lfsck_object_put(env, obj);
6006                 pos++;
6007                 goto again1;
6008         }
6009
6010         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
6011         if (rc != 0)
6012                 GOTO(out, rc);
6013
6014         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
6015                           XATTR_NAME_FID, BYPASS_CAPA);
6016         if (rc == -ENODATA) {
6017                 /* For the pre-created OST-object, update the bitmap to avoid
6018                  * others LFSCK (second phase) iteration to touch it again. */
6019                 if (la->la_ctime == 0) {
6020                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
6021                                 atomic_inc(&lrn->lrn_accessed_count);
6022
6023                         /* For the race between repairing dangling referenced
6024                          * MDT-object and unlink the file, it may left orphan
6025                          * OST-object there. Destroy it now! */
6026                         if (unlikely(!(la->la_mode & S_ISUID))) {
6027                                 dt_read_unlock(env, obj);
6028                                 lfsck_layout_destroy_orphan(env,
6029                                                             lfsck->li_bottom,
6030                                                             obj);
6031                                 lfsck_object_put(env, obj);
6032                                 pos++;
6033                                 goto again1;
6034                         }
6035                 } else if (idx == 0) {
6036                         /* If the orphan OST-object has no parent information,
6037                          * regard it as referenced by the MDT-object on MDT0. */
6038                         fid_zero(&rec->lor_fid);
6039                         rec->lor_uid = la->la_uid;
6040                         rec->lor_gid = la->la_gid;
6041                         GOTO(out, rc = 0);
6042                 }
6043
6044                 dt_read_unlock(env, obj);
6045                 lfsck_object_put(env, obj);
6046                 pos++;
6047                 goto again1;
6048         }
6049
6050         if (rc < 0)
6051                 GOTO(out, rc);
6052
6053         if (rc != sizeof(struct filter_fid) &&
6054             rc != sizeof(struct filter_fid_old))
6055                 GOTO(out, rc = -EINVAL);
6056
6057         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6058         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6059          * MDT-object's FID::f_ver, instead it is the OST-object index in its
6060          * parent MDT-object's layout EA. */
6061         save = rec->lor_fid.f_stripe_idx;
6062         rec->lor_fid.f_ver = 0;
6063         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6064         /* If the orphan OST-object does not claim the MDT, then next.
6065          *
6066          * If we do not know whether it matches or not, then return it
6067          * to the MDT for further check. */
6068         if (rc == 0) {
6069                 dt_read_unlock(env, obj);
6070                 lfsck_object_put(env, obj);
6071                 pos++;
6072                 goto again1;
6073         }
6074
6075         rec->lor_fid.f_stripe_idx = save;
6076         rec->lor_uid = la->la_uid;
6077         rec->lor_gid = la->la_gid;
6078
6079         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6080                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6081                rec->lor_uid, rec->lor_gid);
6082
6083         GOTO(out, rc = 0);
6084
6085 out:
6086         dt_read_unlock(env, obj);
6087         lfsck_object_put(env, obj);
6088         if (rc == 0)
6089                 it->loi_hash++;
6090
6091         return rc;
6092 }
6093
6094 /**
6095  * \retval       +1: locate to the exactly position
6096  * \retval        0: cannot locate to the exactly position,
6097  *                   call next() to move to a valid position.
6098  * \retval      -ve: on error
6099  */
6100 static int lfsck_orphan_it_get(const struct lu_env *env,
6101                                struct dt_it *di,
6102                                const struct dt_key *key)
6103 {
6104         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6105         int                      rc;
6106
6107         it->loi_key = *(struct lu_fid *)key;
6108         rc = lfsck_orphan_it_next(env, di);
6109         if (rc == 1)
6110                 return 0;
6111
6112         if (rc == 0)
6113                 return 1;
6114
6115         return rc;
6116 }
6117
6118 static void lfsck_orphan_it_put(const struct lu_env *env,
6119                                 struct dt_it *di)
6120 {
6121 }
6122
6123 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6124                                           const struct dt_it *di)
6125 {
6126         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6127
6128         return (struct dt_key *)&it->loi_key;
6129 }
6130
6131 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6132                                     const struct dt_it *di)
6133 {
6134         return sizeof(struct lu_fid);
6135 }
6136
6137 static int lfsck_orphan_it_rec(const struct lu_env *env,
6138                                const struct dt_it *di,
6139                                struct dt_rec *rec,
6140                                __u32 attr)
6141 {
6142         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6143
6144         *(struct lu_orphan_rec *)rec = it->loi_rec;
6145
6146         return 0;
6147 }
6148
6149 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6150                                    const struct dt_it *di)
6151 {
6152         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6153
6154         return it->loi_hash;
6155 }
6156
6157 /**
6158  * \retval       +1: locate to the exactly position
6159  * \retval        0: cannot locate to the exactly position,
6160  *                   call next() to move to a valid position.
6161  * \retval      -ve: on error
6162  */
6163 static int lfsck_orphan_it_load(const struct lu_env *env,
6164                                 const struct dt_it *di,
6165                                 __u64 hash)
6166 {
6167         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
6168         struct lfsck_layout_slave_target *llst = it->loi_llst;
6169         int                               rc;
6170
6171         LASSERT(llst != NULL);
6172
6173         if (hash != llst->llst_hash) {
6174                 CWARN("%s: the given hash "LPU64" for orphan iteration does "
6175                       "not match the one when fini "LPU64", to be reset.\n",
6176                       lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6177                       llst->llst_hash);
6178                 fid_zero(&llst->llst_fid);
6179                 llst->llst_hash = 0;
6180         }
6181
6182         it->loi_key = llst->llst_fid;
6183         it->loi_hash = llst->llst_hash;
6184         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6185         if (rc == 1)
6186                 return 0;
6187
6188         if (rc == 0)
6189                 return 1;
6190
6191         return rc;
6192 }
6193
6194 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6195                                    const struct dt_it *di,
6196                                    void *key_rec)
6197 {
6198         return 0;
6199 }
6200
6201 const struct dt_index_operations lfsck_orphan_index_ops = {
6202         .dio_lookup             = lfsck_orphan_index_lookup,
6203         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
6204         .dio_insert             = lfsck_orphan_index_insert,
6205         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
6206         .dio_delete             = lfsck_orphan_index_delete,
6207         .dio_it = {
6208                 .init           = lfsck_orphan_it_init,
6209                 .fini           = lfsck_orphan_it_fini,
6210                 .get            = lfsck_orphan_it_get,
6211                 .put            = lfsck_orphan_it_put,
6212                 .next           = lfsck_orphan_it_next,
6213                 .key            = lfsck_orphan_it_key,
6214                 .key_size       = lfsck_orphan_it_key_size,
6215                 .rec            = lfsck_orphan_it_rec,
6216                 .store          = lfsck_orphan_it_store,
6217                 .load           = lfsck_orphan_it_load,
6218                 .key_rec        = lfsck_orphan_it_key_rec,
6219         }
6220 };