Whamcloud - gitweb
d658c60ed8e69de26b32f74bc5660a9793a464ad
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_linkea.h>
43 #include <lustre_fid.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <md_object.h>
48 #include <obd_class.h>
49
50 #include "lfsck_internal.h"
51
52 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
53
54 static const char lfsck_layout_name[] = "lfsck_layout";
55
56 struct lfsck_layout_seq {
57         struct list_head         lls_list;
58         __u64                    lls_seq;
59         __u64                    lls_lastid;
60         __u64                    lls_lastid_known;
61         struct dt_object        *lls_lastid_obj;
62         unsigned int             lls_dirty:1;
63 };
64
65 struct lfsck_layout_slave_target {
66         /* link into lfsck_layout_slave_data::llsd_master_list. */
67         struct list_head        llst_list;
68         /* The position for next record in the rbtree for iteration. */
69         struct lu_fid           llst_fid;
70         /* Dummy hash for iteration against the rbtree. */
71         __u64                   llst_hash;
72         __u64                   llst_gen;
73         atomic_t                llst_ref;
74         __u32                   llst_index;
75 };
76
77 struct lfsck_layout_slave_data {
78         /* list for lfsck_layout_seq */
79         struct list_head         llsd_seq_list;
80
81         /* list for the masters involve layout verification. */
82         struct list_head         llsd_master_list;
83         spinlock_t               llsd_lock;
84         __u64                    llsd_touch_gen;
85         struct dt_object        *llsd_rb_obj;
86         struct rb_root           llsd_rb_root;
87         rwlock_t                 llsd_rb_lock;
88         unsigned int             llsd_rbtree_valid:1;
89 };
90
91 struct lfsck_layout_object {
92         struct dt_object        *llo_obj;
93         struct lu_attr           llo_attr;
94         atomic_t                 llo_ref;
95         __u16                    llo_gen;
96 };
97
98 struct lfsck_layout_req {
99         struct list_head                 llr_list;
100         struct lfsck_layout_object      *llr_parent;
101         struct dt_object                *llr_child;
102         __u32                            llr_ost_idx;
103         __u32                            llr_lov_idx; /* offset in LOV EA */
104 };
105
106 struct lfsck_layout_master_data {
107         spinlock_t              llmd_lock;
108         struct list_head        llmd_req_list;
109
110         /* list for the ost targets involve layout verification. */
111         struct list_head        llmd_ost_list;
112
113         /* list for the ost targets in phase1 scanning. */
114         struct list_head        llmd_ost_phase1_list;
115
116         /* list for the ost targets in phase1 scanning. */
117         struct list_head        llmd_ost_phase2_list;
118
119         /* list for the mdt targets involve layout verification. */
120         struct list_head        llmd_mdt_list;
121
122         /* list for the mdt targets in phase1 scanning. */
123         struct list_head        llmd_mdt_phase1_list;
124
125         /* list for the mdt targets in phase1 scanning. */
126         struct list_head        llmd_mdt_phase2_list;
127
128         struct ptlrpc_thread    llmd_thread;
129         __u32                   llmd_touch_gen;
130         int                     llmd_prefetched;
131         int                     llmd_assistant_status;
132         int                     llmd_post_result;
133         unsigned int            llmd_to_post:1,
134                                 llmd_to_double_scan:1,
135                                 llmd_in_double_scan:1,
136                                 llmd_exit:1;
137 };
138
139 struct lfsck_layout_slave_async_args {
140         struct obd_export                *llsaa_exp;
141         struct lfsck_component           *llsaa_com;
142         struct lfsck_layout_slave_target *llsaa_llst;
143 };
144
145 static struct lfsck_layout_object *
146 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
147                          __u16 gen)
148 {
149         struct lfsck_layout_object *llo;
150         int                         rc;
151
152         OBD_ALLOC_PTR(llo);
153         if (llo == NULL)
154                 return ERR_PTR(-ENOMEM);
155
156         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
157         if (rc != 0) {
158                 OBD_FREE_PTR(llo);
159
160                 return ERR_PTR(rc);
161         }
162
163         lu_object_get(&obj->do_lu);
164         llo->llo_obj = obj;
165         /* The gen can be used to check whether some others have changed the
166          * file layout after LFSCK pre-fetching but before real verification. */
167         llo->llo_gen = gen;
168         atomic_set(&llo->llo_ref, 1);
169
170         return llo;
171 }
172
173 static inline void
174 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
175 {
176         if (atomic_dec_and_test(&llst->llst_ref)) {
177                 LASSERT(list_empty(&llst->llst_list));
178
179                 OBD_FREE_PTR(llst);
180         }
181 }
182
183 static inline int
184 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
185 {
186         struct lfsck_layout_slave_target *llst;
187         struct lfsck_layout_slave_target *tmp;
188         int                               rc   = 0;
189
190         OBD_ALLOC_PTR(llst);
191         if (llst == NULL)
192                 return -ENOMEM;
193
194         INIT_LIST_HEAD(&llst->llst_list);
195         llst->llst_gen = 0;
196         llst->llst_index = index;
197         atomic_set(&llst->llst_ref, 1);
198
199         spin_lock(&llsd->llsd_lock);
200         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
201                 if (tmp->llst_index == index) {
202                         rc = -EALREADY;
203                         break;
204                 }
205         }
206         if (rc == 0)
207                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
208         spin_unlock(&llsd->llsd_lock);
209
210         if (rc != 0)
211                 OBD_FREE_PTR(llst);
212
213         return rc;
214 }
215
216 static inline void
217 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
218                       struct lfsck_layout_slave_target *llst)
219 {
220         bool del = false;
221
222         spin_lock(&llsd->llsd_lock);
223         if (!list_empty(&llst->llst_list)) {
224                 list_del_init(&llst->llst_list);
225                 del = true;
226         }
227         spin_unlock(&llsd->llsd_lock);
228
229         if (del)
230                 lfsck_layout_llst_put(llst);
231 }
232
233 static inline struct lfsck_layout_slave_target *
234 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
235                                __u32 index, bool unlink)
236 {
237         struct lfsck_layout_slave_target *llst;
238
239         spin_lock(&llsd->llsd_lock);
240         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
241                 if (llst->llst_index == index) {
242                         if (unlink)
243                                 list_del_init(&llst->llst_list);
244                         else
245                                 atomic_inc(&llst->llst_ref);
246                         spin_unlock(&llsd->llsd_lock);
247
248                         return llst;
249                 }
250         }
251         spin_unlock(&llsd->llsd_lock);
252
253         return NULL;
254 }
255
256 static inline void lfsck_layout_object_put(const struct lu_env *env,
257                                            struct lfsck_layout_object *llo)
258 {
259         if (atomic_dec_and_test(&llo->llo_ref)) {
260                 lfsck_object_put(env, llo->llo_obj);
261                 OBD_FREE_PTR(llo);
262         }
263 }
264
265 static struct lfsck_layout_req *
266 lfsck_layout_req_init(struct lfsck_layout_object *parent,
267                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
268 {
269         struct lfsck_layout_req *llr;
270
271         OBD_ALLOC_PTR(llr);
272         if (llr == NULL)
273                 return ERR_PTR(-ENOMEM);
274
275         INIT_LIST_HEAD(&llr->llr_list);
276         atomic_inc(&parent->llo_ref);
277         llr->llr_parent = parent;
278         llr->llr_child = child;
279         llr->llr_ost_idx = ost_idx;
280         llr->llr_lov_idx = lov_idx;
281
282         return llr;
283 }
284
285 static inline void lfsck_layout_req_fini(const struct lu_env *env,
286                                          struct lfsck_layout_req *llr)
287 {
288         lu_object_put(env, &llr->llr_child->do_lu);
289         lfsck_layout_object_put(env, llr->llr_parent);
290         OBD_FREE_PTR(llr);
291 }
292
293 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
294 {
295         bool empty = false;
296
297         spin_lock(&llmd->llmd_lock);
298         if (list_empty(&llmd->llmd_req_list))
299                 empty = true;
300         spin_unlock(&llmd->llmd_lock);
301
302         return empty;
303 }
304
305 static int lfsck_layout_get_lovea(const struct lu_env *env,
306                                   struct dt_object *obj,
307                                   struct lu_buf *buf, ssize_t *buflen)
308 {
309         int rc;
310
311 again:
312         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
313         if (rc == -ERANGE) {
314                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
315                                   BYPASS_CAPA);
316                 if (rc <= 0)
317                         return rc;
318
319                 lu_buf_realloc(buf, rc);
320                 if (buflen != NULL)
321                         *buflen = buf->lb_len;
322
323                 if (buf->lb_buf == NULL)
324                         return -ENOMEM;
325
326                 goto again;
327         }
328
329         if (rc == -ENODATA)
330                 rc = 0;
331
332         if (rc <= 0)
333                 return rc;
334
335         if (unlikely(buf->lb_buf == NULL)) {
336                 lu_buf_alloc(buf, rc);
337                 if (buflen != NULL)
338                         *buflen = buf->lb_len;
339
340                 if (buf->lb_buf == NULL)
341                         return -ENOMEM;
342
343                 goto again;
344         }
345
346         return rc;
347 }
348
349 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
350 {
351         __u32 magic;
352         __u32 patten;
353
354         magic = le32_to_cpu(lmm->lmm_magic);
355         /* If magic crashed, keep it there. Sometime later, during OST-object
356          * orphan handling, if some OST-object(s) back-point to it, it can be
357          * verified and repaired. */
358         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
359                 return -EINVAL;
360
361         patten = le32_to_cpu(lmm->lmm_pattern);
362         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
363         if (patten != LOV_PATTERN_RAID0)
364                 return -EOPNOTSUPP;
365
366         return 0;
367 }
368
369 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
370 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
371 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_SIZE - 1)
372
373 struct lfsck_rbtree_node {
374         struct rb_node   lrn_node;
375         __u64            lrn_seq;
376         __u32            lrn_first_oid;
377         atomic_t         lrn_known_count;
378         atomic_t         lrn_accessed_count;
379         void            *lrn_known_bitmap;
380         void            *lrn_accessed_bitmap;
381 };
382
383 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
384                                    __u64 seq, __u32 oid)
385 {
386         if (seq < lrn->lrn_seq)
387                 return -1;
388
389         if (seq > lrn->lrn_seq)
390                 return 1;
391
392         if (oid < lrn->lrn_first_oid)
393                 return -1;
394
395         if (oid >= lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH)
396                 return 1;
397
398         return 0;
399 }
400
401 /* The caller should hold llsd->llsd_rb_lock. */
402 static struct lfsck_rbtree_node *
403 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
404                     const struct lu_fid *fid, bool *exact)
405 {
406         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
407         struct rb_node           *prev  = NULL;
408         struct lfsck_rbtree_node *lrn   = NULL;
409         int                       rc    = 0;
410
411         if (exact != NULL)
412                 *exact = true;
413
414         while (node != NULL) {
415                 prev = node;
416                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
417                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
418                 if (rc < 0)
419                         node = node->rb_left;
420                 else if (rc > 0)
421                         node = node->rb_right;
422                 else
423                         return lrn;
424         }
425
426         if (exact == NULL)
427                 return NULL;
428
429         /* If there is no exactly matched one, then to the next valid one. */
430         *exact = false;
431
432         /* The rbtree is empty. */
433         if (rc == 0)
434                 return NULL;
435
436         if (rc < 0)
437                 return lrn;
438
439         node = rb_next(prev);
440
441         /* The end of the rbtree. */
442         if (node == NULL)
443                 return NULL;
444
445         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
446
447         return lrn;
448 }
449
450 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
451                                                   const struct lu_fid *fid)
452 {
453         struct lfsck_rbtree_node *lrn;
454
455         OBD_ALLOC_PTR(lrn);
456         if (lrn == NULL)
457                 return ERR_PTR(-ENOMEM);
458
459         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
460         if (lrn->lrn_known_bitmap == NULL) {
461                 OBD_FREE_PTR(lrn);
462
463                 return ERR_PTR(-ENOMEM);
464         }
465
466         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
467         if (lrn->lrn_accessed_bitmap == NULL) {
468                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
469                 OBD_FREE_PTR(lrn);
470
471                 return ERR_PTR(-ENOMEM);
472         }
473
474         rb_init_node(&lrn->lrn_node);
475         lrn->lrn_seq = fid_seq(fid);
476         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
477         atomic_set(&lrn->lrn_known_count, 0);
478         atomic_set(&lrn->lrn_accessed_count, 0);
479
480         return lrn;
481 }
482
483 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
484 {
485         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
486         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
487         OBD_FREE_PTR(lrn);
488 }
489
490 /* The caller should hold lock. */
491 static struct lfsck_rbtree_node *
492 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
493                     struct lfsck_rbtree_node *lrn)
494 {
495         struct rb_node           **pos    = &(llsd->llsd_rb_root.rb_node);
496         struct rb_node            *parent = NULL;
497         struct lfsck_rbtree_node  *tmp;
498         int                        rc;
499
500         while (*pos) {
501                 parent = *pos;
502                 tmp = rb_entry(*pos, struct lfsck_rbtree_node, lrn_node);
503                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
504                 if (rc < 0)
505                         pos = &((*pos)->rb_left);
506                 else if (rc > 0)
507                         pos = &((*pos)->rb_right);
508                 else
509                         return tmp;
510         }
511
512         rb_link_node(&lrn->lrn_node, parent, pos);
513         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
514
515         return lrn;
516 }
517
518 extern const struct dt_index_operations lfsck_orphan_index_ops;
519
520 static int lfsck_rbtree_setup(const struct lu_env *env,
521                               struct lfsck_component *com)
522 {
523         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
524         struct lfsck_instance           *lfsck  = com->lc_lfsck;
525         struct dt_device                *dev    = lfsck->li_bottom;
526         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
527         struct dt_object                *obj;
528
529         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
530         fid->f_oid = lfsck_dev_idx(dev);
531         fid->f_ver = 0;
532         obj = dt_locate(env, dev, fid);
533         if (IS_ERR(obj))
534                 RETURN(PTR_ERR(obj));
535
536         /* Generate an in-RAM object to stand for the layout rbtree.
537          * Scanning the layout rbtree will be via the iteration over
538          * the object. In the future, the rbtree may be written onto
539          * disk with the object.
540          *
541          * Mark the object to be as exist. */
542         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
543         obj->do_index_ops = &lfsck_orphan_index_ops;
544         llsd->llsd_rb_obj = obj;
545         llsd->llsd_rbtree_valid = 1;
546         dev->dd_record_fid_accessed = 1;
547
548         return 0;
549 }
550
551 static void lfsck_rbtree_cleanup(const struct lu_env *env,
552                                  struct lfsck_component *com)
553 {
554         struct lfsck_instance           *lfsck = com->lc_lfsck;
555         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
556         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
557         struct rb_node                  *next;
558         struct lfsck_rbtree_node        *lrn;
559
560         lfsck->li_bottom->dd_record_fid_accessed = 0;
561         /* Invalid the rbtree, then no others will use it. */
562         write_lock(&llsd->llsd_rb_lock);
563         llsd->llsd_rbtree_valid = 0;
564         write_unlock(&llsd->llsd_rb_lock);
565
566         while (node != NULL) {
567                 next = rb_next(node);
568                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
569                 rb_erase(node, &llsd->llsd_rb_root);
570                 lfsck_rbtree_free(lrn);
571                 node = next;
572         }
573
574         if (llsd->llsd_rb_obj != NULL) {
575                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
576                 llsd->llsd_rb_obj = NULL;
577         }
578 }
579
580 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
581                                        struct lfsck_component *com,
582                                        const struct lu_fid *fid,
583                                        bool accessed)
584 {
585         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
586         struct lfsck_rbtree_node        *lrn;
587         bool                             insert = false;
588         int                              idx;
589         int                              rc     = 0;
590         ENTRY;
591
592         CDEBUG(D_LFSCK, "%s: update bitmap for "DFID"\n",
593                lfsck_lfsck2name(com->lc_lfsck), PFID(fid));
594
595         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
596                 RETURN_EXIT;
597
598         if (!fid_is_idif(fid) && !fid_is_norm(fid))
599                 RETURN_EXIT;
600
601         read_lock(&llsd->llsd_rb_lock);
602         if (!llsd->llsd_rbtree_valid)
603                 GOTO(unlock, rc = 0);
604
605         lrn = lfsck_rbtree_search(llsd, fid, NULL);
606         if (lrn == NULL) {
607                 struct lfsck_rbtree_node *tmp;
608
609                 LASSERT(!insert);
610
611                 read_unlock(&llsd->llsd_rb_lock);
612                 tmp = lfsck_rbtree_new(env, fid);
613                 if (IS_ERR(tmp))
614                         GOTO(out, rc = PTR_ERR(tmp));
615
616                 insert = true;
617                 write_lock(&llsd->llsd_rb_lock);
618                 if (!llsd->llsd_rbtree_valid) {
619                         lfsck_rbtree_free(tmp);
620                         GOTO(unlock, rc = 0);
621                 }
622
623                 lrn = lfsck_rbtree_insert(llsd, tmp);
624                 if (lrn != tmp)
625                         lfsck_rbtree_free(tmp);
626         }
627
628         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
629         /* Any accessed object must be a known object. */
630         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
631                 atomic_inc(&lrn->lrn_known_count);
632         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
633                 atomic_inc(&lrn->lrn_accessed_count);
634
635         GOTO(unlock, rc = 0);
636
637 unlock:
638         if (insert)
639                 write_unlock(&llsd->llsd_rb_lock);
640         else
641                 read_unlock(&llsd->llsd_rb_lock);
642 out:
643         if (rc != 0 && accessed) {
644                 struct lfsck_layout *lo = com->lc_file_ram;
645
646                 CERROR("%s: Fail to update object accessed bitmap, will cause "
647                        "incorrect LFSCK OST-object handling, so disable it to "
648                        "cancel orphan handling for related device. rc = %d.\n",
649                        lfsck_lfsck2name(com->lc_lfsck), rc);
650                 lo->ll_flags |= LF_INCOMPLETE;
651                 lfsck_rbtree_cleanup(env, com);
652         }
653 }
654
655 static inline bool is_dummy_lov_ost_data(struct lov_ost_data_v1 *obj)
656 {
657         if (fid_is_zero(&obj->l_ost_oi.oi_fid) &&
658             obj->l_ost_gen == 0 && obj->l_ost_idx == 0)
659                 return true;
660
661         return false;
662 }
663
664 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
665                                    const struct lfsck_layout *src)
666 {
667         int i;
668
669         des->ll_magic = le32_to_cpu(src->ll_magic);
670         des->ll_status = le32_to_cpu(src->ll_status);
671         des->ll_flags = le32_to_cpu(src->ll_flags);
672         des->ll_success_count = le32_to_cpu(src->ll_success_count);
673         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
674         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
675         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
676         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
677         des->ll_time_last_checkpoint =
678                                 le64_to_cpu(src->ll_time_last_checkpoint);
679         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
680         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
681         des->ll_pos_first_inconsistent =
682                         le64_to_cpu(src->ll_pos_first_inconsistent);
683         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
684         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
685         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
686         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
687         for (i = 0; i < LLIT_MAX; i++)
688                 des->ll_objs_repaired[i] =
689                                 le64_to_cpu(src->ll_objs_repaired[i]);
690         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
691 }
692
693 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
694                                    const struct lfsck_layout *src)
695 {
696         int i;
697
698         des->ll_magic = cpu_to_le32(src->ll_magic);
699         des->ll_status = cpu_to_le32(src->ll_status);
700         des->ll_flags = cpu_to_le32(src->ll_flags);
701         des->ll_success_count = cpu_to_le32(src->ll_success_count);
702         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
703         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
704         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
705         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
706         des->ll_time_last_checkpoint =
707                                 cpu_to_le64(src->ll_time_last_checkpoint);
708         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
709         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
710         des->ll_pos_first_inconsistent =
711                         cpu_to_le64(src->ll_pos_first_inconsistent);
712         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
713         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
714         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
715         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
716         for (i = 0; i < LLIT_MAX; i++)
717                 des->ll_objs_repaired[i] =
718                                 cpu_to_le64(src->ll_objs_repaired[i]);
719         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
720 }
721
722 /**
723  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
724  * \retval 0: succeed.
725  * \retval -ve: failed cases.
726  */
727 static int lfsck_layout_load(const struct lu_env *env,
728                              struct lfsck_component *com)
729 {
730         struct lfsck_layout             *lo     = com->lc_file_ram;
731         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
732         ssize_t                          size   = com->lc_file_size;
733         loff_t                           pos    = 0;
734         int                              rc;
735
736         rc = dbo->dbo_read(env, com->lc_obj,
737                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
738                            BYPASS_CAPA);
739         if (rc == 0) {
740                 return -ENOENT;
741         } else if (rc < 0) {
742                 CWARN("%s: failed to load lfsck_layout: rc = %d\n",
743                       lfsck_lfsck2name(com->lc_lfsck), rc);
744                 return rc;
745         } else if (rc != size) {
746                 CWARN("%s: crashed lfsck_layout, to be reset: rc = %d\n",
747                       lfsck_lfsck2name(com->lc_lfsck), rc);
748                 return 1;
749         }
750
751         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
752         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
753                 CWARN("%s: invalid lfsck_layout magic %#x != %#x, "
754                       "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
755                       lo->ll_magic, LFSCK_LAYOUT_MAGIC);
756                 return 1;
757         }
758
759         return 0;
760 }
761
762 static int lfsck_layout_store(const struct lu_env *env,
763                               struct lfsck_component *com)
764 {
765         struct dt_object         *obj           = com->lc_obj;
766         struct lfsck_instance    *lfsck         = com->lc_lfsck;
767         struct lfsck_layout      *lo            = com->lc_file_disk;
768         struct thandle           *handle;
769         ssize_t                   size          = com->lc_file_size;
770         loff_t                    pos           = 0;
771         int                       rc;
772         ENTRY;
773
774         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
775         handle = dt_trans_create(env, lfsck->li_bottom);
776         if (IS_ERR(handle)) {
777                 rc = PTR_ERR(handle);
778                 CERROR("%s: fail to create trans for storing lfsck_layout: "
779                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
780                 RETURN(rc);
781         }
782
783         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
784                                      pos, handle);
785         if (rc != 0) {
786                 CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
787                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
788                 GOTO(out, rc);
789         }
790
791         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
792         if (rc != 0) {
793                 CERROR("%s: fail to start trans for storing lfsck_layout: "
794                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
795                 GOTO(out, rc);
796         }
797
798         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
799                              handle);
800         if (rc != 0)
801                 CERROR("%s: fail to store lfsck_layout(1): size = %d, "
802                        "rc = %d\n", lfsck_lfsck2name(lfsck), (int)size, rc);
803
804         GOTO(out, rc);
805
806 out:
807         dt_trans_stop(env, lfsck->li_bottom, handle);
808
809         return rc;
810 }
811
812 static int lfsck_layout_init(const struct lu_env *env,
813                              struct lfsck_component *com)
814 {
815         struct lfsck_layout *lo = com->lc_file_ram;
816         int rc;
817
818         memset(lo, 0, com->lc_file_size);
819         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
820         lo->ll_status = LS_INIT;
821         down_write(&com->lc_sem);
822         rc = lfsck_layout_store(env, com);
823         up_write(&com->lc_sem);
824
825         return rc;
826 }
827
828 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
829                              struct dt_object *obj, const struct lu_fid *fid)
830 {
831         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
832         struct lu_seq_range      range  = { 0 };
833         struct lustre_mdt_attrs *lma;
834         int                      rc;
835
836         fld_range_set_any(&range);
837         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
838         if (rc == 0) {
839                 if (fld_range_is_ost(&range))
840                         return 1;
841
842                 return 0;
843         }
844
845         lma = &lfsck_env_info(env)->lti_lma;
846         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
847                           XATTR_NAME_LMA, BYPASS_CAPA);
848         if (rc == sizeof(*lma)) {
849                 lustre_lma_swab(lma);
850
851                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
852         }
853
854         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
855
856         return rc > 0;
857 }
858
859 static struct lfsck_layout_seq *
860 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
861 {
862         struct lfsck_layout_seq *lls;
863
864         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
865                 if (lls->lls_seq == seq)
866                         return lls;
867
868                 if (lls->lls_seq > seq)
869                         return NULL;
870         }
871
872         return NULL;
873 }
874
875 static void
876 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
877                         struct lfsck_layout_seq *lls)
878 {
879         struct lfsck_layout_seq *tmp;
880         struct list_head        *pos = &llsd->llsd_seq_list;
881
882         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
883                 if (lls->lls_seq < tmp->lls_seq) {
884                         pos = &tmp->lls_list;
885                         break;
886                 }
887         }
888         list_add_tail(&lls->lls_list, pos);
889 }
890
891 static int
892 lfsck_layout_lastid_create(const struct lu_env *env,
893                            struct lfsck_instance *lfsck,
894                            struct dt_object *obj)
895 {
896         struct lfsck_thread_info *info   = lfsck_env_info(env);
897         struct lu_attr           *la     = &info->lti_la;
898         struct dt_object_format  *dof    = &info->lti_dof;
899         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
900         struct dt_device         *dt     = lfsck->li_bottom;
901         struct thandle           *th;
902         __u64                     lastid = 0;
903         loff_t                    pos    = 0;
904         int                       rc;
905         ENTRY;
906
907         CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
908                fid_seq(lfsck_dto2fid(obj)));
909
910         if (bk->lb_param & LPF_DRYRUN)
911                 return 0;
912
913         memset(la, 0, sizeof(*la));
914         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
915         la->la_valid = LA_MODE | LA_UID | LA_GID;
916         dof->dof_type = dt_mode_to_dft(S_IFREG);
917
918         th = dt_trans_create(env, dt);
919         if (IS_ERR(th))
920                 RETURN(rc = PTR_ERR(th));
921
922         rc = dt_declare_create(env, obj, la, NULL, dof, th);
923         if (rc != 0)
924                 GOTO(stop, rc);
925
926         rc = dt_declare_record_write(env, obj,
927                                      lfsck_buf_get(env, &lastid,
928                                                    sizeof(lastid)),
929                                      pos, th);
930         if (rc != 0)
931                 GOTO(stop, rc);
932
933         rc = dt_trans_start_local(env, dt, th);
934         if (rc != 0)
935                 GOTO(stop, rc);
936
937         dt_write_lock(env, obj, 0);
938         if (likely(!dt_object_exists(obj))) {
939                 rc = dt_create(env, obj, la, NULL, dof, th);
940                 if (rc == 0)
941                         rc = dt_record_write(env, obj,
942                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
943                                 &pos, th);
944         }
945         dt_write_unlock(env, obj);
946
947         GOTO(stop, rc);
948
949 stop:
950         dt_trans_stop(env, dt, th);
951
952         return rc;
953 }
954
955 static int
956 lfsck_layout_lastid_reload(const struct lu_env *env,
957                            struct lfsck_component *com,
958                            struct lfsck_layout_seq *lls)
959 {
960         __u64   lastid;
961         loff_t  pos     = 0;
962         int     rc;
963
964         dt_read_lock(env, lls->lls_lastid_obj, 0);
965         rc = dt_record_read(env, lls->lls_lastid_obj,
966                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
967         dt_read_unlock(env, lls->lls_lastid_obj);
968         if (unlikely(rc != 0))
969                 return rc;
970
971         lastid = le64_to_cpu(lastid);
972         if (lastid < lls->lls_lastid_known) {
973                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
974                 struct lfsck_layout     *lo     = com->lc_file_ram;
975
976                 lls->lls_lastid = lls->lls_lastid_known;
977                 lls->lls_dirty = 1;
978                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
979                         LASSERT(lfsck->li_out_notify != NULL);
980
981                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
982                                              LE_LASTID_REBUILDING);
983                         lo->ll_flags |= LF_CRASHED_LASTID;
984                 }
985         } else if (lastid >= lls->lls_lastid) {
986                 lls->lls_lastid = lastid;
987                 lls->lls_dirty = 0;
988         }
989
990         return 0;
991 }
992
993 static int
994 lfsck_layout_lastid_store(const struct lu_env *env,
995                           struct lfsck_component *com)
996 {
997         struct lfsck_instance           *lfsck  = com->lc_lfsck;
998         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
999         struct dt_device                *dt     = lfsck->li_bottom;
1000         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1001         struct lfsck_layout_seq         *lls;
1002         struct thandle                  *th;
1003         __u64                            lastid;
1004         int                              rc     = 0;
1005         int                              rc1    = 0;
1006
1007         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1008                 loff_t pos = 0;
1009
1010                 /* XXX: Add the code back if we really found related
1011                  *      inconsistent cases in the future. */
1012 #if 0
1013                 if (!lls->lls_dirty) {
1014                         /* In OFD, before the pre-creation, the LAST_ID
1015                          * file will be updated firstly, which may hide
1016                          * some potential crashed cases. For example:
1017                          *
1018                          * The old obj1's ID is higher than old LAST_ID
1019                          * but lower than the new LAST_ID, but the LFSCK
1020                          * have not touch the obj1 until the OFD updated
1021                          * the LAST_ID. So the LFSCK does not regard it
1022                          * as crashed case. But when OFD does not create
1023                          * successfully, it will set the LAST_ID as the
1024                          * real created objects' ID, then LFSCK needs to
1025                          * found related inconsistency. */
1026                         rc = lfsck_layout_lastid_reload(env, com, lls);
1027                         if (likely(!lls->lls_dirty))
1028                                 continue;
1029                 }
1030 #endif
1031
1032                 CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
1033                        " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
1034
1035                 if (bk->lb_param & LPF_DRYRUN) {
1036                         lls->lls_dirty = 0;
1037                         continue;
1038                 }
1039
1040                 th = dt_trans_create(env, dt);
1041                 if (IS_ERR(th)) {
1042                         rc1 = PTR_ERR(th);
1043                         CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
1044                                lfsck_lfsck2name(com->lc_lfsck),
1045                                lls->lls_seq, rc1);
1046                         continue;
1047                 }
1048
1049                 lastid = cpu_to_le64(lls->lls_lastid);
1050                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1051                                              lfsck_buf_get(env, &lastid,
1052                                              sizeof(lastid)), pos, th);
1053                 if (rc != 0)
1054                         goto stop;
1055
1056                 rc = dt_trans_start_local(env, dt, th);
1057                 if (rc != 0)
1058                         goto stop;
1059
1060                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1061                 rc = dt_record_write(env, lls->lls_lastid_obj,
1062                                      lfsck_buf_get(env, &lastid,
1063                                      sizeof(lastid)), &pos, th);
1064                 dt_write_unlock(env, lls->lls_lastid_obj);
1065                 if (rc == 0)
1066                         lls->lls_dirty = 0;
1067
1068 stop:
1069                 dt_trans_stop(env, dt, th);
1070                 if (rc != 0) {
1071                         rc1 = rc;
1072                         CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
1073                                lfsck_lfsck2name(com->lc_lfsck),
1074                                lls->lls_seq, rc1);
1075                 }
1076         }
1077
1078         return rc1;
1079 }
1080
1081 static int
1082 lfsck_layout_lastid_load(const struct lu_env *env,
1083                          struct lfsck_component *com,
1084                          struct lfsck_layout_seq *lls)
1085 {
1086         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1087         struct lfsck_layout     *lo     = com->lc_file_ram;
1088         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1089         struct dt_object        *obj;
1090         loff_t                   pos    = 0;
1091         int                      rc;
1092         ENTRY;
1093
1094         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1095         obj = dt_locate(env, lfsck->li_bottom, fid);
1096         if (IS_ERR(obj))
1097                 RETURN(PTR_ERR(obj));
1098
1099         /* LAST_ID crashed, to be rebuilt */
1100         if (!dt_object_exists(obj)) {
1101                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1102                         LASSERT(lfsck->li_out_notify != NULL);
1103
1104                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1105                                              LE_LASTID_REBUILDING);
1106                         lo->ll_flags |= LF_CRASHED_LASTID;
1107
1108                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1109                             cfs_fail_val > 0) {
1110                                 struct l_wait_info lwi = LWI_TIMEOUT(
1111                                                 cfs_time_seconds(cfs_fail_val),
1112                                                 NULL, NULL);
1113
1114                                 up_write(&com->lc_sem);
1115                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1116                                              !thread_is_running(&lfsck->li_thread),
1117                                              &lwi);
1118                                 down_write(&com->lc_sem);
1119                         }
1120                 }
1121
1122                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1123         } else {
1124                 dt_read_lock(env, obj, 0);
1125                 rc = dt_read(env, obj,
1126                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1127                         &pos);
1128                 dt_read_unlock(env, obj);
1129                 if (rc != 0 && rc != sizeof(__u64))
1130                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1131
1132                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1133                         LASSERT(lfsck->li_out_notify != NULL);
1134
1135                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1136                                              LE_LASTID_REBUILDING);
1137                         lo->ll_flags |= LF_CRASHED_LASTID;
1138                 }
1139
1140                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1141                 rc = 0;
1142         }
1143
1144         GOTO(out, rc);
1145
1146 out:
1147         if (rc != 0)
1148                 lfsck_object_put(env, obj);
1149         else
1150                 lls->lls_lastid_obj = obj;
1151
1152         return rc;
1153 }
1154
1155 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1156                                                struct ptlrpc_request *req,
1157                                                void *args, int rc)
1158 {
1159         struct lfsck_async_interpret_args *laia = args;
1160         struct lfsck_component            *com  = laia->laia_com;
1161         struct lfsck_layout_master_data   *llmd = com->lc_data;
1162         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1163         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1164         struct lfsck_request              *lr   = laia->laia_lr;
1165
1166         switch (lr->lr_event) {
1167         case LE_START:
1168                 if (rc != 0) {
1169                         struct lfsck_layout *lo = com->lc_file_ram;
1170
1171                         CERROR("%s: fail to notify %s %x for layout start: "
1172                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1173                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1174                                ltd->ltd_index, rc);
1175                         lo->ll_flags |= LF_INCOMPLETE;
1176                         break;
1177                 }
1178
1179                 spin_lock(&ltds->ltd_lock);
1180                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1181                         spin_unlock(&ltds->ltd_lock);
1182                         break;
1183                 }
1184
1185                 if (lr->lr_flags & LEF_TO_OST) {
1186                         if (list_empty(&ltd->ltd_layout_list))
1187                                 list_add_tail(&ltd->ltd_layout_list,
1188                                               &llmd->llmd_ost_list);
1189                         if (list_empty(&ltd->ltd_layout_phase_list))
1190                                 list_add_tail(&ltd->ltd_layout_phase_list,
1191                                               &llmd->llmd_ost_phase1_list);
1192                 } else {
1193                         if (list_empty(&ltd->ltd_layout_list))
1194                                 list_add_tail(&ltd->ltd_layout_list,
1195                                               &llmd->llmd_mdt_list);
1196                         if (list_empty(&ltd->ltd_layout_phase_list))
1197                                 list_add_tail(&ltd->ltd_layout_phase_list,
1198                                               &llmd->llmd_mdt_phase1_list);
1199                 }
1200                 spin_unlock(&ltds->ltd_lock);
1201                 break;
1202         case LE_STOP:
1203         case LE_PHASE1_DONE:
1204         case LE_PHASE2_DONE:
1205         case LE_PEER_EXIT:
1206                 if (rc != 0 && rc != -EALREADY)
1207                         CWARN("%s: fail to notify %s %x for layout: "
1208                               "event = %d, rc = %d\n",
1209                               lfsck_lfsck2name(com->lc_lfsck),
1210                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1211                               ltd->ltd_index, lr->lr_event, rc);
1212                 break;
1213         case LE_QUERY: {
1214                 struct lfsck_reply *reply;
1215
1216                 if (rc != 0) {
1217                         spin_lock(&ltds->ltd_lock);
1218                         list_del_init(&ltd->ltd_layout_phase_list);
1219                         list_del_init(&ltd->ltd_layout_list);
1220                         spin_unlock(&ltds->ltd_lock);
1221                         break;
1222                 }
1223
1224                 reply = req_capsule_server_get(&req->rq_pill,
1225                                                &RMF_LFSCK_REPLY);
1226                 if (reply == NULL) {
1227                         rc = -EPROTO;
1228                         CERROR("%s: invalid return value: rc = %d\n",
1229                                lfsck_lfsck2name(com->lc_lfsck), rc);
1230                         spin_lock(&ltds->ltd_lock);
1231                         list_del_init(&ltd->ltd_layout_phase_list);
1232                         list_del_init(&ltd->ltd_layout_list);
1233                         spin_unlock(&ltds->ltd_lock);
1234                         break;
1235                 }
1236
1237                 switch (reply->lr_status) {
1238                 case LS_SCANNING_PHASE1:
1239                         break;
1240                 case LS_SCANNING_PHASE2:
1241                         spin_lock(&ltds->ltd_lock);
1242                         list_del_init(&ltd->ltd_layout_phase_list);
1243                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1244                                 spin_unlock(&ltds->ltd_lock);
1245                                 break;
1246                         }
1247
1248                         if (lr->lr_flags & LEF_TO_OST)
1249                                 list_add_tail(&ltd->ltd_layout_phase_list,
1250                                               &llmd->llmd_ost_phase2_list);
1251                         else
1252                                 list_add_tail(&ltd->ltd_layout_phase_list,
1253                                               &llmd->llmd_mdt_phase2_list);
1254                         spin_unlock(&ltds->ltd_lock);
1255                         break;
1256                 default:
1257                         spin_lock(&ltds->ltd_lock);
1258                         list_del_init(&ltd->ltd_layout_phase_list);
1259                         list_del_init(&ltd->ltd_layout_list);
1260                         spin_unlock(&ltds->ltd_lock);
1261                         break;
1262                 }
1263                 break;
1264         }
1265         default:
1266                 CERROR("%s: unexpected event: rc = %d\n",
1267                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1268                 break;
1269         }
1270
1271         if (!laia->laia_shared) {
1272                 lfsck_tgt_put(ltd);
1273                 lfsck_component_put(env, com);
1274         }
1275
1276         return 0;
1277 }
1278
1279 static int lfsck_layout_master_query_others(const struct lu_env *env,
1280                                             struct lfsck_component *com)
1281 {
1282         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1283         struct lfsck_request              *lr    = &info->lti_lr;
1284         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1285         struct lfsck_instance             *lfsck = com->lc_lfsck;
1286         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1287         struct ptlrpc_request_set         *set;
1288         struct lfsck_tgt_descs            *ltds;
1289         struct lfsck_tgt_desc             *ltd;
1290         struct list_head                  *head;
1291         int                                rc    = 0;
1292         int                                rc1   = 0;
1293         ENTRY;
1294
1295         set = ptlrpc_prep_set();
1296         if (set == NULL)
1297                 RETURN(-ENOMEM);
1298
1299         llmd->llmd_touch_gen++;
1300         memset(lr, 0, sizeof(*lr));
1301         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1302         lr->lr_event = LE_QUERY;
1303         lr->lr_active = LT_LAYOUT;
1304         laia->laia_com = com;
1305         laia->laia_lr = lr;
1306         laia->laia_shared = 0;
1307
1308         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1309                 ltds = &lfsck->li_mdt_descs;
1310                 lr->lr_flags = 0;
1311                 head = &llmd->llmd_mdt_phase1_list;
1312         } else {
1313
1314 again:
1315                 ltds = &lfsck->li_ost_descs;
1316                 lr->lr_flags = LEF_TO_OST;
1317                 head = &llmd->llmd_ost_phase1_list;
1318         }
1319
1320         laia->laia_ltds = ltds;
1321         spin_lock(&ltds->ltd_lock);
1322         while (!list_empty(head)) {
1323                 ltd = list_entry(head->next,
1324                                  struct lfsck_tgt_desc,
1325                                  ltd_layout_phase_list);
1326                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1327                         break;
1328
1329                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1330                 list_del(&ltd->ltd_layout_phase_list);
1331                 list_add_tail(&ltd->ltd_layout_phase_list, head);
1332                 atomic_inc(&ltd->ltd_ref);
1333                 laia->laia_ltd = ltd;
1334                 spin_unlock(&ltds->ltd_lock);
1335                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1336                                          lfsck_layout_master_async_interpret,
1337                                          laia, LFSCK_QUERY);
1338                 if (rc != 0) {
1339                         CERROR("%s: fail to query %s %x for layout: rc = %d\n",
1340                                lfsck_lfsck2name(lfsck),
1341                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1342                                ltd->ltd_index, rc);
1343                         lfsck_tgt_put(ltd);
1344                         rc1 = rc;
1345                 }
1346                 spin_lock(&ltds->ltd_lock);
1347         }
1348         spin_unlock(&ltds->ltd_lock);
1349
1350         rc = ptlrpc_set_wait(set);
1351         if (rc < 0) {
1352                 ptlrpc_set_destroy(set);
1353                 RETURN(rc);
1354         }
1355
1356         if (!(lr->lr_flags & LEF_TO_OST) &&
1357             list_empty(&llmd->llmd_mdt_phase1_list))
1358                 goto again;
1359
1360         ptlrpc_set_destroy(set);
1361
1362         RETURN(rc1 != 0 ? rc1 : rc);
1363 }
1364
1365 static inline bool
1366 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1367 {
1368         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1369                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1370                 list_empty(&llmd->llmd_ost_phase1_list));
1371 }
1372
1373 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1374                                              struct lfsck_component *com,
1375                                              struct lfsck_request *lr)
1376 {
1377         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1378         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1379         struct lfsck_instance             *lfsck = com->lc_lfsck;
1380         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1381         struct lfsck_layout               *lo    = com->lc_file_ram;
1382         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1383         struct ptlrpc_request_set         *set;
1384         struct lfsck_tgt_descs            *ltds;
1385         struct lfsck_tgt_desc             *ltd;
1386         struct lfsck_tgt_desc             *next;
1387         struct list_head                  *head;
1388         __u32                              idx;
1389         int                                rc    = 0;
1390         ENTRY;
1391
1392         set = ptlrpc_prep_set();
1393         if (set == NULL)
1394                 RETURN(-ENOMEM);
1395
1396         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1397         lr->lr_active = LT_LAYOUT;
1398         laia->laia_com = com;
1399         laia->laia_lr = lr;
1400         laia->laia_shared = 0;
1401         switch (lr->lr_event) {
1402         case LE_START:
1403                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1404                 ltds = &lfsck->li_ost_descs;
1405                 laia->laia_ltds = ltds;
1406                 down_read(&ltds->ltd_rw_sem);
1407                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1408                         ltd = lfsck_tgt_get(ltds, idx);
1409                         LASSERT(ltd != NULL);
1410
1411                         laia->laia_ltd = ltd;
1412                         ltd->ltd_layout_done = 0;
1413                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1414                                         lfsck_layout_master_async_interpret,
1415                                         laia, LFSCK_NOTIFY);
1416                         if (rc != 0) {
1417                                 CERROR("%s: fail to notify %s %x for layout "
1418                                        "start: rc = %d\n",
1419                                        lfsck_lfsck2name(lfsck),
1420                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1421                                        "MDT", idx, rc);
1422                                 lfsck_tgt_put(ltd);
1423                                 lo->ll_flags |= LF_INCOMPLETE;
1424                         }
1425                 }
1426                 up_read(&ltds->ltd_rw_sem);
1427
1428                 /* Sync up */
1429                 rc = ptlrpc_set_wait(set);
1430                 if (rc < 0) {
1431                         ptlrpc_set_destroy(set);
1432                         RETURN(rc);
1433                 }
1434
1435                 if (!(bk->lb_param & LPF_ALL_TGT))
1436                         break;
1437
1438                 /* link other MDT targets locallly. */
1439                 spin_lock(&ltds->ltd_lock);
1440                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1441                         ltd = LTD_TGT(ltds, idx);
1442                         LASSERT(ltd != NULL);
1443
1444                         if (!list_empty(&ltd->ltd_layout_list))
1445                                 continue;
1446
1447                         list_add_tail(&ltd->ltd_layout_list,
1448                                       &llmd->llmd_mdt_list);
1449                         list_add_tail(&ltd->ltd_layout_phase_list,
1450                                       &llmd->llmd_mdt_phase1_list);
1451                 }
1452                 spin_unlock(&ltds->ltd_lock);
1453                 break;
1454         case LE_STOP:
1455         case LE_PHASE2_DONE:
1456         case LE_PEER_EXIT: {
1457                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1458                 if (bk->lb_param & LPF_ALL_TGT) {
1459                         head = &llmd->llmd_mdt_list;
1460                         ltds = &lfsck->li_mdt_descs;
1461                         if (lr->lr_event == LE_STOP) {
1462                                 /* unlink other MDT targets locallly. */
1463                                 spin_lock(&ltds->ltd_lock);
1464                                 list_for_each_entry_safe(ltd, next, head,
1465                                                          ltd_layout_list) {
1466                                         list_del_init(&ltd->ltd_layout_phase_list);
1467                                         list_del_init(&ltd->ltd_layout_list);
1468                                 }
1469                                 spin_unlock(&ltds->ltd_lock);
1470
1471                                 lr->lr_flags |= LEF_TO_OST;
1472                                 head = &llmd->llmd_ost_list;
1473                                 ltds = &lfsck->li_ost_descs;
1474                         } else {
1475                                 lr->lr_flags &= ~LEF_TO_OST;
1476                         }
1477                 } else {
1478                         lr->lr_flags |= LEF_TO_OST;
1479                         head = &llmd->llmd_ost_list;
1480                         ltds = &lfsck->li_ost_descs;
1481                 }
1482
1483 again:
1484                 laia->laia_ltds = ltds;
1485                 spin_lock(&ltds->ltd_lock);
1486                 while (!list_empty(head)) {
1487                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1488                                          ltd_layout_list);
1489                         if (!list_empty(&ltd->ltd_layout_phase_list))
1490                                 list_del_init(&ltd->ltd_layout_phase_list);
1491                         list_del_init(&ltd->ltd_layout_list);
1492                         atomic_inc(&ltd->ltd_ref);
1493                         laia->laia_ltd = ltd;
1494                         spin_unlock(&ltds->ltd_lock);
1495                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1496                                         lfsck_layout_master_async_interpret,
1497                                         laia, LFSCK_NOTIFY);
1498                         if (rc != 0) {
1499                                 CERROR("%s: fail to notify %s %x for layout "
1500                                        "stop/phase2: rc = %d\n",
1501                                        lfsck_lfsck2name(lfsck),
1502                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1503                                        "MDT", ltd->ltd_index, rc);
1504                                 lfsck_tgt_put(ltd);
1505                         }
1506                         spin_lock(&ltds->ltd_lock);
1507                 }
1508                 spin_unlock(&ltds->ltd_lock);
1509
1510                 rc = ptlrpc_set_wait(set);
1511                 if (rc < 0) {
1512                         ptlrpc_set_destroy(set);
1513                         RETURN(rc);
1514                 }
1515
1516                 if (!(lr->lr_flags & LEF_TO_OST)) {
1517                         lr->lr_flags |= LEF_TO_OST;
1518                         head = &llmd->llmd_ost_list;
1519                         ltds = &lfsck->li_ost_descs;
1520                         goto again;
1521                 }
1522                 break;
1523         }
1524         case LE_PHASE1_DONE:
1525                 llmd->llmd_touch_gen++;
1526                 ltds = &lfsck->li_mdt_descs;
1527                 laia->laia_ltds = ltds;
1528                 spin_lock(&ltds->ltd_lock);
1529                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1530                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1531                                          struct lfsck_tgt_desc,
1532                                          ltd_layout_phase_list);
1533                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1534                                 break;
1535
1536                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1537                         list_del_init(&ltd->ltd_layout_phase_list);
1538                         list_add_tail(&ltd->ltd_layout_phase_list,
1539                                       &llmd->llmd_mdt_phase1_list);
1540                         atomic_inc(&ltd->ltd_ref);
1541                         laia->laia_ltd = ltd;
1542                         spin_unlock(&ltds->ltd_lock);
1543                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1544                                         lfsck_layout_master_async_interpret,
1545                                         laia, LFSCK_NOTIFY);
1546                         if (rc != 0) {
1547                                 CERROR("%s: fail to notify MDT %x for layout "
1548                                        "phase1 done: rc = %d\n",
1549                                        lfsck_lfsck2name(lfsck),
1550                                        ltd->ltd_index, rc);
1551                                 lfsck_tgt_put(ltd);
1552                         }
1553                         spin_lock(&ltds->ltd_lock);
1554                 }
1555                 spin_unlock(&ltds->ltd_lock);
1556                 break;
1557         default:
1558                 CERROR("%s: unexpected LFSCK event: rc = %d\n",
1559                        lfsck_lfsck2name(lfsck), lr->lr_event);
1560                 rc = -EINVAL;
1561                 break;
1562         }
1563
1564         rc = ptlrpc_set_wait(set);
1565         ptlrpc_set_destroy(set);
1566
1567         RETURN(rc);
1568 }
1569
1570 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1571                                            struct lfsck_component *com,
1572                                            int rc)
1573 {
1574         struct lfsck_instance   *lfsck = com->lc_lfsck;
1575         struct lfsck_layout     *lo    = com->lc_file_ram;
1576         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1577
1578         down_write(&com->lc_sem);
1579
1580         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1581                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1582         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1583         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1584
1585         if (rc > 0) {
1586                 com->lc_journal = 0;
1587                 if (lo->ll_flags & LF_INCOMPLETE)
1588                         lo->ll_status = LS_PARTIAL;
1589                 else
1590                         lo->ll_status = LS_COMPLETED;
1591                 if (!(bk->lb_param & LPF_DRYRUN))
1592                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1593                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1594                 lo->ll_success_count++;
1595         } else if (rc == 0) {
1596                 lo->ll_status = lfsck->li_status;
1597                 if (lo->ll_status == 0)
1598                         lo->ll_status = LS_STOPPED;
1599         } else {
1600                 lo->ll_status = LS_FAILED;
1601         }
1602
1603         if (lo->ll_status != LS_PAUSED) {
1604                 spin_lock(&lfsck->li_lock);
1605                 list_del_init(&com->lc_link);
1606                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1607                 spin_unlock(&lfsck->li_lock);
1608         }
1609
1610         rc = lfsck_layout_store(env, com);
1611
1612         up_write(&com->lc_sem);
1613
1614         return rc;
1615 }
1616
1617 static int lfsck_layout_lock(const struct lu_env *env,
1618                              struct lfsck_component *com,
1619                              struct dt_object *obj,
1620                              struct lustre_handle *lh, __u64 bits)
1621 {
1622         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1623         ldlm_policy_data_t              *policy = &info->lti_policy;
1624         struct ldlm_res_id              *resid  = &info->lti_resid;
1625         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1626         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1627         int                              rc;
1628
1629         LASSERT(lfsck->li_namespace != NULL);
1630
1631         memset(policy, 0, sizeof(*policy));
1632         policy->l_inodebits.bits = bits;
1633         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1634         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1635                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1636                                     ldlm_completion_ast, NULL, NULL, 0,
1637                                     LVB_T_NONE, NULL, lh);
1638         if (rc == ELDLM_OK) {
1639                 rc = 0;
1640         } else {
1641                 memset(lh, 0, sizeof(*lh));
1642                 rc = -EIO;
1643         }
1644
1645         return rc;
1646 }
1647
1648 static void lfsck_layout_unlock(struct lustre_handle *lh)
1649 {
1650         if (lustre_handle_is_used(lh)) {
1651                 ldlm_lock_decref(lh, LCK_EX);
1652                 memset(lh, 0, sizeof(*lh));
1653         }
1654 }
1655
1656 static int lfsck_layout_trans_stop(const struct lu_env *env,
1657                                    struct dt_device *dev,
1658                                    struct thandle *handle, int result)
1659 {
1660         int rc;
1661
1662         handle->th_result = result;
1663         rc = dt_trans_stop(env, dev, handle);
1664         if (rc > 0)
1665                 rc = 0;
1666         else if (rc == 0)
1667                 rc = 1;
1668
1669         return rc;
1670 }
1671
1672 /**
1673  * \retval       +1: repaired
1674  * \retval        0: did nothing
1675  * \retval      -ve: on error
1676  */
1677 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1678                                      struct thandle *handle,
1679                                      struct dt_object *parent,
1680                                      struct lu_fid *cfid,
1681                                      struct lu_buf *buf,
1682                                      struct lov_ost_data_v1 *slot,
1683                                      int fl, __u32 ost_idx)
1684 {
1685         struct ost_id   *oi     = &lfsck_env_info(env)->lti_oi;
1686         int              rc;
1687
1688         fid_to_ostid(cfid, oi);
1689         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1690         slot->l_ost_gen = cpu_to_le32(0);
1691         slot->l_ost_idx = cpu_to_le32(ost_idx);
1692         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle,
1693                           BYPASS_CAPA);
1694         if (rc == 0)
1695                 rc = 1;
1696
1697         return rc;
1698 }
1699
1700 /**
1701  * \retval       +1: repaired
1702  * \retval        0: did nothing
1703  * \retval      -ve: on error
1704  */
1705 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1706                                      struct thandle *handle,
1707                                      struct dt_object *parent,
1708                                      struct lu_fid *cfid,
1709                                      struct lu_buf *buf, int fl,
1710                                      __u32 ost_idx, __u32 ea_off)
1711 {
1712         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1713         struct lov_ost_data_v1  *objs;
1714         int                      rc;
1715         ENTRY;
1716
1717         if (fl == LU_XATTR_CREATE) {
1718                 LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1,
1719                                                        LOV_MAGIC_V1));
1720
1721                 memset(lmm, 0, buf->lb_len);
1722                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1723                 /* XXX: currently, we only support LOV_PATTERN_RAID0. */
1724                 lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
1725                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1726                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1727                 /* XXX: We cannot know the stripe size,
1728                  *      then use the default value (1 MB). */
1729                 lmm->lmm_stripe_size =
1730                         cpu_to_le32(LOV_DESC_STRIPE_SIZE_DEFAULT);
1731                 objs = &(lmm->lmm_objects[ea_off]);
1732         } else {
1733                 __u16   count = le16_to_cpu(lmm->lmm_stripe_count);
1734                 int     gap   = ea_off - count;
1735                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1736
1737                 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3
1738                  * which has been verified in lfsck_layout_verify_header()
1739                  * already. If some new magic introduced in the future,
1740                  * then layout LFSCK needs to be updated also. */
1741                 if (magic == LOV_MAGIC_V1) {
1742                         objs = &(lmm->lmm_objects[count]);
1743                 } else {
1744                         LASSERT(magic == LOV_MAGIC_V3);
1745                         objs = &((struct lov_mds_md_v3 *)lmm)->
1746                                                         lmm_objects[count];
1747                 }
1748
1749                 if (gap > 0)
1750                         memset(objs, 0, gap * sizeof(*objs));
1751                 lmm->lmm_layout_gen =
1752                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1753                 objs += gap;
1754
1755                 LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1, magic));
1756         }
1757
1758         lmm->lmm_stripe_count = cpu_to_le16(ea_off + 1);
1759         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1760                                        fl, ost_idx);
1761
1762         RETURN(rc);
1763 }
1764
1765 /**
1766  * \retval       +1: repaired
1767  * \retval        0: did nothing
1768  * \retval      -ve: on error
1769  */
1770 static int lfsck_layout_update_pfid(const struct lu_env *env,
1771                                     struct lfsck_component *com,
1772                                     struct dt_object *parent,
1773                                     struct lu_fid *cfid,
1774                                     struct dt_device *cdev, __u32 ea_off)
1775 {
1776         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1777         struct dt_object        *child;
1778         struct thandle          *handle;
1779         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1780         struct lu_buf           *buf;
1781         int                      rc     = 0;
1782         ENTRY;
1783
1784         child = lfsck_object_find_by_dev(env, cdev, cfid);
1785         if (IS_ERR(child))
1786                 RETURN(PTR_ERR(child));
1787
1788         handle = dt_trans_create(env, cdev);
1789         if (IS_ERR(handle))
1790                 GOTO(out, rc = PTR_ERR(handle));
1791
1792         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1793         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1794         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1795          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1796          * parent MDT-object's layout EA. */
1797         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1798         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1799
1800         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1801         if (rc != 0)
1802                 GOTO(stop, rc);
1803
1804         rc = dt_trans_start(env, cdev, handle);
1805         if (rc != 0)
1806                 GOTO(stop, rc);
1807
1808         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1809                           BYPASS_CAPA);
1810
1811         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1812
1813 stop:
1814         dt_trans_stop(env, cdev, handle);
1815
1816 out:
1817         lu_object_put(env, &child->do_lu);
1818
1819         return rc;
1820 }
1821
1822 /**
1823  * \retval       +1: repaired
1824  * \retval        0: did nothing
1825  * \retval      -ve: on error
1826  */
1827 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1828                                         struct lfsck_component *com,
1829                                         struct lfsck_tgt_desc *ltd,
1830                                         struct lu_orphan_rec *rec,
1831                                         struct lu_fid *cfid,
1832                                         const char *prefix,
1833                                         const char *postfix,
1834                                         __u32 ea_off)
1835 {
1836         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1837         char                            *name   = info->lti_key;
1838         struct lu_attr                  *la     = &info->lti_la;
1839         struct dt_object_format         *dof    = &info->lti_dof;
1840         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1841         struct lu_fid                   *pfid   = &rec->lor_fid;
1842         struct lu_fid                   *tfid   = &info->lti_fid3;
1843         struct dt_device                *next   = lfsck->li_next;
1844         struct dt_object                *pobj   = NULL;
1845         struct dt_object                *cobj   = NULL;
1846         struct thandle                  *th     = NULL;
1847         struct lu_buf                   *pbuf   = NULL;
1848         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1849         struct lustre_handle             lh     = { 0 };
1850         int                              buflen = ea_buf->lb_len;
1851         int                              idx    = 0;
1852         int                              rc     = 0;
1853         ENTRY;
1854
1855         /* Create .lustre/lost+found/MDTxxxx when needed. */
1856         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1857                 rc = lfsck_create_lpf(env, lfsck);
1858                 if (rc != 0)
1859                         RETURN(rc);
1860         }
1861
1862         if (fid_is_zero(pfid)) {
1863                 struct filter_fid *ff = &info->lti_new_pfid;
1864
1865                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
1866                 if (rc != 0)
1867                         RETURN(rc);
1868
1869                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
1870                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
1871                 /* Currently, the filter_fid::ff_parent::f_ver is not the
1872                  * real parent MDT-object's FID::f_ver, instead it is the
1873                  * OST-object index in its parent MDT-object's layout EA. */
1874                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1875                 pbuf = lfsck_buf_get(env, ff, sizeof(struct filter_fid));
1876                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
1877                 if (IS_ERR(cobj))
1878                         RETURN(PTR_ERR(cobj));
1879         }
1880
1881         CDEBUG(D_LFSCK, "Re-create the lost MDT-object: parent "
1882                DFID", child "DFID", OST-index %u, stripe-index %u, "
1883                "prefix %s, postfix %s\n",
1884                PFID(pfid), PFID(cfid), ltd->ltd_index, ea_off, prefix, postfix);
1885
1886         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
1887         if (IS_ERR(pobj))
1888                 GOTO(put, rc = PTR_ERR(pobj));
1889
1890         LASSERT(prefix != NULL);
1891         LASSERT(postfix != NULL);
1892
1893         /** name rules:
1894          *
1895          *  1. Use the MDT-object's FID as the name with prefix and postfix.
1896          *
1897          *  1.1 prefix "C-":    More than one OST-objects claim the same
1898          *                      MDT-object and the same slot in the layout EA.
1899          *                      It may be created for dangling referenced MDT
1900          *                      object or may be not.
1901          *  1.2 prefix "N-":    The orphan OST-object does not know which one
1902          *                      is the real parent, so the LFSCK assign a new
1903          *                      FID as its parent.
1904          *  1.3 prefix "R-":    The orphan OST-object know its parent FID but
1905          *                      does not know the position in the namespace.
1906          *
1907          *  2. If there is name conflict, append more index for new name. */
1908         sprintf(name, "%s"DFID"%s", prefix, PFID(pfid), postfix);
1909         do {
1910                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
1911                                (const struct dt_key *)name, BYPASS_CAPA);
1912                 if (rc != 0 && rc != -ENOENT)
1913                         GOTO(put, rc);
1914
1915                 if (unlikely(rc == 0)) {
1916                         CWARN("%s: The name %s under lost+found has been used "
1917                               "by the "DFID". Try to increase the FID version "
1918                               "for the new file name.\n",
1919                               lfsck_lfsck2name(lfsck), name, PFID(tfid));
1920                         sprintf(name, "%s"DFID"%s-%d", prefix, PFID(pfid),
1921                                 postfix, ++idx);
1922                 }
1923         } while (rc == 0);
1924
1925         memset(la, 0, sizeof(*la));
1926         la->la_uid = rec->lor_uid;
1927         la->la_gid = rec->lor_gid;
1928         la->la_mode = S_IFREG | S_IRUSR | S_IWUSR;
1929         la->la_valid = LA_MODE | LA_UID | LA_GID;
1930
1931         memset(dof, 0, sizeof(*dof));
1932         dof->dof_type = dt_mode_to_dft(S_IFREG);
1933
1934         rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
1935         if (buflen < rc) {
1936                 lu_buf_realloc(ea_buf, rc);
1937                 buflen = ea_buf->lb_len;
1938                 if (ea_buf->lb_buf == NULL)
1939                         GOTO(put, rc = -ENOMEM);
1940         } else {
1941                 ea_buf->lb_len = rc;
1942         }
1943
1944         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
1945          *
1946          * XXX: Currently, we do not grab the PDO lock as normal create cases,
1947          *      because creating MDT-object for orphan OST-object is rare, we
1948          *      do not much care about the performance. It can be improved in
1949          *      the future when needed. */
1950         rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
1951                                MDS_INODELOCK_UPDATE);
1952         if (rc != 0)
1953                 GOTO(put, rc);
1954
1955         th = dt_trans_create(env, next);
1956         if (IS_ERR(th))
1957                 GOTO(unlock, rc = PTR_ERR(th));
1958
1959         /* 1a. Update OST-object's parent information remotely.
1960          *
1961          * If other subsequent modifications failed, then next LFSCK scanning
1962          * will process the OST-object as orphan again with known parent FID. */
1963         if (cobj != NULL) {
1964                 rc = dt_declare_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th);
1965                 if (rc != 0)
1966                         GOTO(stop, rc);
1967         }
1968
1969         /* 2a. Create the MDT-object locally. */
1970         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
1971         if (rc != 0)
1972                 GOTO(stop, rc);
1973
1974         /* 3a. Add layout EA for the MDT-object. */
1975         rc = dt_declare_xattr_set(env, pobj, ea_buf, XATTR_NAME_LOV,
1976                                   LU_XATTR_CREATE, th);
1977         if (rc != 0)
1978                 GOTO(stop, rc);
1979
1980         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
1981         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
1982                                (const struct dt_rec *)pfid,
1983                                (const struct dt_key *)name, th);
1984         if (rc != 0)
1985                 GOTO(stop, rc);
1986
1987         rc = dt_trans_start(env, next, th);
1988         if (rc != 0)
1989                 GOTO(stop, rc);
1990
1991         /* 1b. Update OST-object's parent information remotely. */
1992         if (cobj != NULL) {
1993                 rc = dt_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th,
1994                                   BYPASS_CAPA);
1995                 if (rc != 0)
1996                         GOTO(stop, rc);
1997         }
1998
1999         dt_write_lock(env, pobj, 0);
2000         /* 2b. Create the MDT-object locally. */
2001         rc = dt_create(env, pobj, la, NULL, dof, th);
2002         if (rc == 0)
2003                 /* 3b. Add layout EA for the MDT-object. */
2004                 rc = lfsck_layout_extend_lovea(env, th, pobj, cfid, ea_buf,
2005                                                LU_XATTR_CREATE, ltd->ltd_index,
2006                                                ea_off);
2007         dt_write_unlock(env, pobj);
2008         if (rc < 0)
2009                 GOTO(stop, rc);
2010
2011         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2012         rc = dt_insert(env, lfsck->li_lpf_obj,
2013                        (const struct dt_rec *)pfid,
2014                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2015
2016         GOTO(stop, rc);
2017
2018 stop:
2019         dt_trans_stop(env, next, th);
2020
2021 unlock:
2022         lfsck_layout_unlock(&lh);
2023
2024 put:
2025         if (cobj != NULL && !IS_ERR(cobj))
2026                 lu_object_put(env, &cobj->do_lu);
2027         if (pobj != NULL && !IS_ERR(pobj))
2028                 lu_object_put(env, &pobj->do_lu);
2029         ea_buf->lb_len = buflen;
2030
2031         return rc >= 0 ? 1 : rc;
2032 }
2033
2034 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2035                                                    struct lfsck_component *com,
2036                                                    const struct lu_fid *fid,
2037                                                    __u32 index)
2038 {
2039         struct lfsck_thread_info *info  = lfsck_env_info(env);
2040         struct lfsck_request     *lr    = &info->lti_lr;
2041         struct lfsck_instance    *lfsck = com->lc_lfsck;
2042         struct lfsck_tgt_desc    *ltd;
2043         struct ptlrpc_request    *req;
2044         struct lfsck_request     *tmp;
2045         struct obd_export        *exp;
2046         int                       rc    = 0;
2047         ENTRY;
2048
2049         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2050         if (unlikely(ltd == NULL))
2051                 RETURN(-ENODEV);
2052
2053         exp = ltd->ltd_exp;
2054         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2055                 GOTO(put, rc = -EOPNOTSUPP);
2056
2057         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2058         if (req == NULL)
2059                 GOTO(put, rc = -ENOMEM);
2060
2061         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2062         if (rc != 0) {
2063                 ptlrpc_request_free(req);
2064
2065                 GOTO(put, rc);
2066         }
2067
2068         memset(lr, 0, sizeof(*lr));
2069         lr->lr_event = LE_CONDITIONAL_DESTROY;
2070         lr->lr_active = LT_LAYOUT;
2071         lr->lr_fid = *fid;
2072
2073         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2074         *tmp = *lr;
2075         ptlrpc_request_set_replen(req);
2076
2077         rc = ptlrpc_queue_wait(req);
2078         ptlrpc_req_finished(req);
2079
2080         GOTO(put, rc);
2081
2082 put:
2083         lfsck_tgt_put(ltd);
2084
2085         return rc;
2086 }
2087
2088 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2089                                                   struct lfsck_component *com,
2090                                                   struct lfsck_request *lr)
2091 {
2092         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2093         struct lu_attr                  *la     = &info->lti_la;
2094         ldlm_policy_data_t              *policy = &info->lti_policy;
2095         struct ldlm_res_id              *resid  = &info->lti_resid;
2096         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2097         struct dt_device                *dev    = lfsck->li_bottom;
2098         struct lu_fid                   *fid    = &lr->lr_fid;
2099         struct dt_object                *obj;
2100         struct thandle                  *th     = NULL;
2101         struct lustre_handle             lh     = { 0 };
2102         __u64                            flags  = 0;
2103         int                              rc     = 0;
2104         ENTRY;
2105
2106         obj = lfsck_object_find_by_dev(env, dev, fid);
2107         if (IS_ERR(obj))
2108                 RETURN(PTR_ERR(obj));
2109
2110         dt_read_lock(env, obj, 0);
2111         if (dt_object_exists(obj) == 0) {
2112                 dt_read_unlock(env, obj);
2113
2114                 GOTO(put, rc = -ENOENT);
2115         }
2116
2117         /* Get obj's attr without lock firstly. */
2118         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2119         dt_read_unlock(env, obj);
2120         if (rc != 0)
2121                 GOTO(put, rc);
2122
2123         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2124                 GOTO(put, rc = -ETXTBSY);
2125
2126         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2127         LASSERT(lfsck->li_namespace != NULL);
2128
2129         memset(policy, 0, sizeof(*policy));
2130         policy->l_extent.end = OBD_OBJECT_EOF;
2131         ost_fid_build_resid(fid, resid);
2132         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2133                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2134                                     ldlm_completion_ast, NULL, NULL, 0,
2135                                     LVB_T_NONE, NULL, &lh);
2136         if (rc != ELDLM_OK)
2137                 GOTO(put, rc = -EIO);
2138
2139         dt_write_lock(env, obj, 0);
2140         /* Get obj's attr within lock again. */
2141         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2142         if (rc != 0)
2143                 GOTO(unlock, rc);
2144
2145         if (la->la_ctime != 0)
2146                 GOTO(unlock, rc = -ETXTBSY);
2147
2148         th = dt_trans_create(env, dev);
2149         if (IS_ERR(th))
2150                 GOTO(unlock, rc = PTR_ERR(th));
2151
2152         rc = dt_declare_ref_del(env, obj, th);
2153         if (rc != 0)
2154                 GOTO(stop, rc);
2155
2156         rc = dt_declare_destroy(env, obj, th);
2157         if (rc != 0)
2158                 GOTO(stop, rc);
2159
2160         rc = dt_trans_start_local(env, dev, th);
2161         if (rc != 0)
2162                 GOTO(stop, rc);
2163
2164         rc = dt_ref_del(env, obj, th);
2165         if (rc != 0)
2166                 GOTO(stop, rc);
2167
2168         rc = dt_destroy(env, obj, th);
2169         if (rc == 0)
2170                 CDEBUG(D_LFSCK, "Destroy the empty OST-object "DFID" which "
2171                        "was created for reparing dangling referenced case. "
2172                        "But the original missed OST-object is found now.\n",
2173                        PFID(fid));
2174
2175         GOTO(stop, rc);
2176
2177 stop:
2178         dt_trans_stop(env, dev, th);
2179
2180 unlock:
2181         dt_write_unlock(env, obj);
2182         ldlm_lock_decref(&lh, LCK_EX);
2183
2184 put:
2185         lu_object_put(env, &obj->do_lu);
2186
2187         return rc;
2188 }
2189
2190 /**
2191  * Some OST-object has occupied the specified layout EA slot.
2192  * Such OST-object may be generated by the LFSCK when repair
2193  * dangling referenced MDT-object, which can be indicated by
2194  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2195  * is true and such OST-object has not been modified yet, we
2196  * will replace it with the orphan OST-object; otherwise the
2197  * LFSCK will create new MDT-object to reference the orphan.
2198  *
2199  * \retval       +1: repaired
2200  * \retval        0: did nothing
2201  * \retval      -ve: on error
2202  */
2203 static int lfsck_layout_conflict_create(const struct lu_env *env,
2204                                         struct lfsck_component *com,
2205                                         struct lfsck_tgt_desc *ltd,
2206                                         struct lu_orphan_rec *rec,
2207                                         struct dt_object *parent,
2208                                         struct lu_fid *cfid,
2209                                         struct lu_buf *ea_buf,
2210                                         struct lov_ost_data_v1 *slot,
2211                                         __u32 ea_off, __u32 ori_len)
2212 {
2213         struct lfsck_thread_info *info          = lfsck_env_info(env);
2214         struct lu_fid            *cfid2         = &info->lti_fid2;
2215         struct ost_id            *oi            = &info->lti_oi;
2216         char                     *postfix       = info->lti_tmpbuf;
2217         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2218         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2219         struct thandle           *th            = NULL;
2220         struct lustre_handle      lh            = { 0 };
2221         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2222         int                       rc            = 0;
2223         ENTRY;
2224
2225         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2226         ostid_to_fid(cfid2, oi, ost_idx2);
2227
2228         CDEBUG(D_LFSCK, "Handle layout EA conflict: parent "DFID
2229                ", cur-child "DFID" on the OST %u, orphan-child "
2230                DFID" on the OST %u, stripe-index %u\n",
2231                PFID(lfsck_dto2fid(parent)), PFID(cfid2), ost_idx2,
2232                PFID(cfid), ltd->ltd_index, ea_off);
2233
2234         /* Hold layout lock on the parent to prevent others to access. */
2235         rc = lfsck_layout_lock(env, com, parent, &lh,
2236                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2237         if (rc != 0)
2238                 GOTO(out, rc);
2239
2240         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2241
2242         /* If the conflict OST-obejct is not created for fixing dangling
2243          * referenced MDT-object in former LFSCK check/repair, or it has
2244          * been modified by others, then we cannot destroy it. Re-create
2245          * a new MDT-object for the orphan OST-object. */
2246         if (rc == -ETXTBSY) {
2247                 /* No need the layout lock on the original parent. */
2248                 lfsck_layout_unlock(&lh);
2249                 ea_buf->lb_len = ori_len;
2250
2251                 fid_zero(&rec->lor_fid);
2252                 snprintf(postfix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2253                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2254                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2255                                                   "C-", postfix, ea_off);
2256
2257                 RETURN(rc);
2258         }
2259
2260         if (rc != 0 && rc != -ENOENT)
2261                 GOTO(unlock, rc);
2262
2263         th = dt_trans_create(env, dev);
2264         if (IS_ERR(th))
2265                 GOTO(unlock, rc = PTR_ERR(th));
2266
2267         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2268                                   LU_XATTR_REPLACE, th);
2269         if (rc != 0)
2270                 GOTO(stop, rc);
2271
2272         rc = dt_trans_start_local(env, dev, th);
2273         if (rc != 0)
2274                 GOTO(stop, rc);
2275
2276         dt_write_lock(env, parent, 0);
2277         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2278         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2279                                        LU_XATTR_REPLACE, ltd->ltd_index);
2280         dt_write_unlock(env, parent);
2281
2282         GOTO(stop, rc);
2283
2284 stop:
2285         dt_trans_stop(env, dev, th);
2286
2287 unlock:
2288         lfsck_layout_unlock(&lh);
2289
2290 out:
2291         ea_buf->lb_len = ori_len;
2292
2293         return rc >= 0 ? 1 : rc;
2294 }
2295
2296 /**
2297  * \retval       +1: repaired
2298  * \retval        0: did nothing
2299  * \retval      -ve: on error
2300  */
2301 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2302                                        struct lfsck_component *com,
2303                                        struct lfsck_tgt_desc *ltd,
2304                                        struct lu_orphan_rec *rec,
2305                                        struct dt_object *parent,
2306                                        struct lu_fid *cfid,
2307                                        __u32 ost_idx, __u32 ea_off)
2308 {
2309         struct lfsck_thread_info *info          = lfsck_env_info(env);
2310         struct lu_buf            *buf           = &info->lti_big_buf;
2311         struct lu_fid            *fid           = &info->lti_fid2;
2312         struct ost_id            *oi            = &info->lti_oi;
2313         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2314         struct dt_device         *dt            = lfsck->li_bottom;
2315         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2316         struct thandle            *handle       = NULL;
2317         size_t                    buflen        = buf->lb_len;
2318         struct lov_mds_md_v1     *lmm;
2319         struct lov_ost_data_v1   *objs;
2320         struct lustre_handle      lh            = { 0 };
2321         __u32                     magic;
2322         int                       fl            = 0;
2323         int                       rc            = 0;
2324         int                       rc1;
2325         int                       i;
2326         __u16                     count;
2327         bool                      locked        = false;
2328         ENTRY;
2329
2330         CDEBUG(D_LFSCK, "Re-create the crashed layout EA: parent "
2331                DFID", child "DFID", OST-index %u, stripe-index %u\n",
2332                PFID(lfsck_dto2fid(parent)), PFID(cfid), ost_idx, ea_off);
2333
2334         rc = lfsck_layout_lock(env, com, parent, &lh,
2335                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2336         if (rc != 0)
2337                 RETURN(rc);
2338
2339 again:
2340         if (locked) {
2341                 dt_write_unlock(env, parent);
2342                 locked = false;
2343         }
2344
2345         if (handle != NULL) {
2346                 dt_trans_stop(env, dt, handle);
2347                 handle = NULL;
2348         }
2349
2350         if (rc < 0)
2351                 GOTO(unlock_layout, rc);
2352
2353         if (buf->lb_len < rc) {
2354                 lu_buf_realloc(buf, rc);
2355                 buflen = buf->lb_len;
2356                 if (buf->lb_buf == NULL)
2357                         GOTO(unlock_layout, rc = -ENOMEM);
2358         }
2359
2360         if (!(bk->lb_param & LPF_DRYRUN)) {
2361                 handle = dt_trans_create(env, dt);
2362                 if (IS_ERR(handle))
2363                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2364
2365                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2366                                           fl, handle);
2367                 if (rc != 0)
2368                         GOTO(stop, rc);
2369
2370                 rc = dt_trans_start_local(env, dt, handle);
2371                 if (rc != 0)
2372                         GOTO(stop, rc);
2373         }
2374
2375         dt_write_lock(env, parent, 0);
2376         locked = true;
2377         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2378         if (rc == -ERANGE) {
2379                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2380                                   BYPASS_CAPA);
2381                 LASSERT(rc != 0);
2382                 goto again;
2383         } else if (rc == -ENODATA || rc == 0) {
2384                 rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2385                 /* If the declared is not big enough, re-try. */
2386                 if (buf->lb_len < rc)
2387                         goto again;
2388
2389                 fl = LU_XATTR_CREATE;
2390         } else if (rc < 0) {
2391                 GOTO(unlock_parent, rc);
2392         } else if (unlikely(buf->lb_len == 0)) {
2393                 goto again;
2394         } else {
2395                 fl = LU_XATTR_REPLACE;
2396         }
2397
2398         if (fl == LU_XATTR_CREATE) {
2399                 if (bk->lb_param & LPF_DRYRUN)
2400                         GOTO(unlock_parent, rc = 1);
2401
2402                 LASSERT(buf->lb_len >= rc);
2403
2404                 buf->lb_len = rc;
2405                 rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
2406                                                fl, ost_idx, ea_off);
2407
2408                 GOTO(unlock_parent, rc);
2409         }
2410
2411         lmm = buf->lb_buf;
2412         rc1 = lfsck_layout_verify_header(lmm);
2413         if (rc1 != 0)
2414                 GOTO(unlock_parent, rc = rc1);
2415
2416         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2417          * been verified in lfsck_layout_verify_header() already. If some
2418          * new magic introduced in the future, then layout LFSCK needs to
2419          * be updated also. */
2420         magic = le32_to_cpu(lmm->lmm_magic);
2421         if (magic == LOV_MAGIC_V1) {
2422                 objs = &(lmm->lmm_objects[0]);
2423         } else {
2424                 LASSERT(magic == LOV_MAGIC_V3);
2425                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2426         }
2427
2428         count = le16_to_cpu(lmm->lmm_stripe_count);
2429         if (count == 0)
2430                 GOTO(unlock_parent, rc = -EINVAL);
2431         LASSERT(count > 0);
2432
2433         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2434         if (count <= ea_off) {
2435                 if (bk->lb_param & LPF_DRYRUN)
2436                         GOTO(unlock_parent, rc = 1);
2437
2438                 rc = lov_mds_md_size(ea_off + 1, magic);
2439                 /* If the declared is not big enough, re-try. */
2440                 if (buf->lb_len < rc)
2441                         goto again;
2442
2443                 buf->lb_len = rc;
2444                 rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
2445                                                fl, ost_idx, ea_off);
2446                 GOTO(unlock_parent, rc);
2447         }
2448
2449         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2450
2451         buf->lb_len = rc;
2452         for (i = 0; i < count; i++, objs++) {
2453                 /* The MDT-object was created via lfsck_layout_recover_create()
2454                  * by others before, and we fill the dummy layout EA. */
2455                 if (is_dummy_lov_ost_data(objs)) {
2456                         if (i != ea_off)
2457                                 continue;
2458
2459                         if (bk->lb_param & LPF_DRYRUN)
2460                                 GOTO(unlock_parent, rc = 1);
2461
2462                         lmm->lmm_layout_gen =
2463                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2464                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2465                                                        cfid, buf, objs, fl,
2466                                                        ost_idx);
2467                         GOTO(unlock_parent, rc);
2468                 }
2469
2470                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2471                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2472                 /* It should be rare case, the slot is there, but the LFSCK
2473                  * does not handle it during the first-phase cycle scanning. */
2474                 if (unlikely(lu_fid_eq(fid, cfid))) {
2475                         if (i == ea_off) {
2476                                 GOTO(unlock_parent, rc = 0);
2477                         } else {
2478                                 /* Rare case that the OST-object index
2479                                  * does not match the parent MDT-object
2480                                  * layout EA. We trust the later one. */
2481                                 if (bk->lb_param & LPF_DRYRUN)
2482                                         GOTO(unlock_parent, rc = 1);
2483
2484                                 dt_write_unlock(env, parent);
2485                                 if (handle != NULL)
2486                                         dt_trans_stop(env, dt, handle);
2487                                 lfsck_layout_unlock(&lh);
2488                                 buf->lb_len = buflen;
2489                                 rc = lfsck_layout_update_pfid(env, com, parent,
2490                                                         cfid, ltd->ltd_tgt, i);
2491
2492                                 RETURN(rc);
2493                         }
2494                 }
2495         }
2496
2497         /* The MDT-object exists, but related layout EA slot is occupied
2498          * by others. */
2499         if (bk->lb_param & LPF_DRYRUN)
2500                 GOTO(unlock_parent, rc = 1);
2501
2502         dt_write_unlock(env, parent);
2503         if (handle != NULL)
2504                 dt_trans_stop(env, dt, handle);
2505         lfsck_layout_unlock(&lh);
2506         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2507                 objs = &(lmm->lmm_objects[ea_off]);
2508         else
2509                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2510         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2511                                           buf, objs, ea_off, buflen);
2512
2513         RETURN(rc);
2514
2515 unlock_parent:
2516         if (locked)
2517                 dt_write_unlock(env, parent);
2518
2519 stop:
2520         if (handle != NULL)
2521                 dt_trans_stop(env, dt, handle);
2522
2523 unlock_layout:
2524         lfsck_layout_unlock(&lh);
2525         buf->lb_len = buflen;
2526
2527         return rc;
2528 }
2529
2530 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2531                                         struct lfsck_component *com,
2532                                         struct lfsck_tgt_desc *ltd,
2533                                         struct lu_orphan_rec *rec,
2534                                         struct lu_fid *cfid)
2535 {
2536         struct lfsck_layout     *lo     = com->lc_file_ram;
2537         struct lu_fid           *pfid   = &rec->lor_fid;
2538         struct dt_object        *parent = NULL;
2539         __u32                    ea_off = pfid->f_stripe_idx;
2540         int                      rc     = 0;
2541         ENTRY;
2542
2543         if (!fid_is_sane(cfid))
2544                 GOTO(out, rc = -EINVAL);
2545
2546         if (fid_is_zero(pfid)) {
2547                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2548                                                   "N-", "", ea_off);
2549                 GOTO(out, rc);
2550         }
2551
2552         pfid->f_ver = 0;
2553         if (!fid_is_sane(pfid))
2554                 GOTO(out, rc = -EINVAL);
2555
2556         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2557         if (IS_ERR(parent))
2558                 GOTO(out, rc = PTR_ERR(parent));
2559
2560         if (unlikely(dt_object_remote(parent) != 0))
2561                 GOTO(put, rc = -EXDEV);
2562
2563         if (dt_object_exists(parent) == 0) {
2564                 lu_object_put(env, &parent->do_lu);
2565                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2566                                                   "R-", "", ea_off);
2567                 GOTO(out, rc);
2568         }
2569
2570         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2571                 GOTO(put, rc = -EISDIR);
2572
2573         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2574                                          ltd->ltd_index, ea_off);
2575
2576         GOTO(put, rc);
2577
2578 put:
2579         if (rc <= 0)
2580                 lu_object_put(env, &parent->do_lu);
2581         else
2582                 /* The layout EA is changed, need to be reloaded next time. */
2583                 lu_object_put_nocache(env, &parent->do_lu);
2584
2585 out:
2586         down_write(&com->lc_sem);
2587         com->lc_new_scanned++;
2588         com->lc_new_checked++;
2589         if (rc > 0) {
2590                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2591                 rc = 0;
2592         } else if (rc < 0) {
2593                 lo->ll_objs_failed_phase2++;
2594         }
2595         up_write(&com->lc_sem);
2596
2597         return rc;
2598 }
2599
2600 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2601                                     struct lfsck_component *com,
2602                                     struct lfsck_tgt_desc *ltd)
2603 {
2604         struct lfsck_layout             *lo     = com->lc_file_ram;
2605         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2606         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2607         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2608         struct ost_id                   *oi     = &info->lti_oi;
2609         struct lu_fid                   *fid    = &info->lti_fid;
2610         struct dt_object                *obj;
2611         const struct dt_it_ops          *iops;
2612         struct dt_it                    *di;
2613         int                              rc     = 0;
2614         ENTRY;
2615
2616         CDEBUG(D_LFSCK, "%s: start the orphan scanning for OST%04x\n",
2617                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2618
2619         ostid_set_seq(oi, FID_SEQ_IDIF);
2620         ostid_set_id(oi, 0);
2621         ostid_to_fid(fid, oi, ltd->ltd_index);
2622         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2623         if (unlikely(IS_ERR(obj)))
2624                 RETURN(PTR_ERR(obj));
2625
2626         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2627         if (rc != 0)
2628                 GOTO(put, rc);
2629
2630         iops = &obj->do_index_ops->dio_it;
2631         di = iops->init(env, obj, 0, BYPASS_CAPA);
2632         if (IS_ERR(di))
2633                 GOTO(put, rc = PTR_ERR(di));
2634
2635         rc = iops->load(env, di, 0);
2636         if (rc == -ESRCH) {
2637                 /* -ESRCH means that the orphan OST-objects rbtree has been
2638                  * cleanup because of the OSS server restart or other errors. */
2639                 lo->ll_flags |= LF_INCOMPLETE;
2640                 GOTO(fini, rc);
2641         }
2642
2643         if (rc == 0)
2644                 rc = iops->next(env, di);
2645         else if (rc > 0)
2646                 rc = 0;
2647
2648         if (rc < 0)
2649                 GOTO(fini, rc);
2650
2651         if (rc > 0)
2652                 GOTO(fini, rc = 0);
2653
2654         do {
2655                 struct dt_key           *key;
2656                 struct lu_orphan_rec    *rec = &info->lti_rec;
2657
2658                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2659                     cfs_fail_val > 0) {
2660                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2661                         struct l_wait_info       lwi;
2662
2663                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2664                                           NULL, NULL);
2665                         l_wait_event(thread->t_ctl_waitq,
2666                                      !thread_is_running(thread),
2667                                      &lwi);
2668                 }
2669
2670                 key = iops->key(env, di);
2671                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2672                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2673                 if (rc == 0)
2674                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2675                                         &com->lc_fid_latest_scanned_phase2);
2676                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2677                         GOTO(fini, rc);
2678
2679                 lfsck_control_speed_by_self(com);
2680                 do {
2681                         rc = iops->next(env, di);
2682                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2683         } while (rc == 0);
2684
2685         GOTO(fini, rc);
2686
2687 fini:
2688         iops->put(env, di);
2689         iops->fini(env, di);
2690 put:
2691         lu_object_put(env, &obj->do_lu);
2692
2693         CDEBUG(D_LFSCK, "%s: finish the orphan scanning for OST%04x, rc = %d\n",
2694                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2695
2696         return rc > 0 ? 0 : rc;
2697 }
2698
2699 /* For the MDT-object with dangling reference, we need to re-create
2700  * the missed OST-object with the known FID/owner information. */
2701 static int lfsck_layout_recreate_ostobj(const struct lu_env *env,
2702                                         struct lfsck_component *com,
2703                                         struct lfsck_layout_req *llr,
2704                                         struct lu_attr *la)
2705 {
2706         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2707         struct filter_fid               *pfid   = &info->lti_new_pfid;
2708         struct dt_allocation_hint       *hint   = &info->lti_hint;
2709         struct dt_object                *parent = llr->llr_parent->llo_obj;
2710         struct dt_object                *child  = llr->llr_child;
2711         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2712         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2713         struct thandle                  *handle;
2714         struct lu_buf                   *buf;
2715         struct lustre_handle             lh     = { 0 };
2716         int                              rc;
2717         ENTRY;
2718
2719         CDEBUG(D_LFSCK, "Repair dangling reference for: parent "DFID
2720                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2721                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2722                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid);
2723
2724         rc = lfsck_layout_lock(env, com, parent, &lh,
2725                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2726         if (rc != 0)
2727                 RETURN(rc);
2728
2729         handle = dt_trans_create(env, dev);
2730         if (IS_ERR(handle))
2731                 GOTO(unlock1, rc = PTR_ERR(handle));
2732
2733         hint->dah_parent = NULL;
2734         hint->dah_mode = 0;
2735         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2736         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2737         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2738          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2739          * parent MDT-object's layout EA. */
2740         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2741         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2742
2743         rc = dt_declare_create(env, child, la, hint, NULL, handle);
2744         if (rc != 0)
2745                 GOTO(stop, rc);
2746
2747         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2748                                   LU_XATTR_CREATE, handle);
2749         if (rc != 0)
2750                 GOTO(stop, rc);
2751
2752         rc = dt_trans_start(env, dev, handle);
2753         if (rc != 0)
2754                 GOTO(stop, rc);
2755
2756         dt_read_lock(env, parent, 0);
2757         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2758                 GOTO(unlock2, rc = 1);
2759
2760         rc = dt_create(env, child, la, hint, NULL, handle);
2761         if (rc != 0)
2762                 GOTO(unlock2, rc);
2763
2764         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2765                           handle, BYPASS_CAPA);
2766
2767         GOTO(unlock2, rc);
2768
2769 unlock2:
2770         dt_read_unlock(env, parent);
2771
2772 stop:
2773         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2774
2775 unlock1:
2776         lfsck_layout_unlock(&lh);
2777
2778         return rc;
2779 }
2780
2781 /* If the OST-object does not recognize the MDT-object as its parent, and
2782  * there is no other MDT-object claims as its parent, then just trust the
2783  * given MDT-object as its parent. So update the OST-object filter_fid. */
2784 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2785                                               struct lfsck_component *com,
2786                                               struct lfsck_layout_req *llr,
2787                                               const struct lu_attr *pla)
2788 {
2789         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2790         struct filter_fid               *pfid   = &info->lti_new_pfid;
2791         struct lu_attr                  *tla    = &info->lti_la3;
2792         struct dt_object                *parent = llr->llr_parent->llo_obj;
2793         struct dt_object                *child  = llr->llr_child;
2794         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2795         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2796         struct thandle                  *handle;
2797         struct lu_buf                   *buf;
2798         struct lustre_handle             lh     = { 0 };
2799         int                              rc;
2800         ENTRY;
2801
2802         CDEBUG(D_LFSCK, "Repair unmatched MDT-OST pair for: parent "DFID
2803                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2804                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2805                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
2806
2807         rc = lfsck_layout_lock(env, com, parent, &lh,
2808                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2809         if (rc != 0)
2810                 RETURN(rc);
2811
2812         handle = dt_trans_create(env, dev);
2813         if (IS_ERR(handle))
2814                 GOTO(unlock1, rc = PTR_ERR(handle));
2815
2816         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2817         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2818         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2819          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2820          * parent MDT-object's layout EA. */
2821         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2822         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2823
2824         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
2825         if (rc != 0)
2826                 GOTO(stop, rc);
2827
2828         tla->la_valid = LA_UID | LA_GID;
2829         tla->la_uid = pla->la_uid;
2830         tla->la_gid = pla->la_gid;
2831         rc = dt_declare_attr_set(env, child, tla, handle);
2832         if (rc != 0)
2833                 GOTO(stop, rc);
2834
2835         rc = dt_trans_start(env, dev, handle);
2836         if (rc != 0)
2837                 GOTO(stop, rc);
2838
2839         dt_write_lock(env, parent, 0);
2840         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2841                 GOTO(unlock2, rc = 1);
2842
2843         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
2844                           BYPASS_CAPA);
2845         if (rc != 0)
2846                 GOTO(unlock2, rc);
2847
2848         /* Get the latest parent's owner. */
2849         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2850         if (rc != 0)
2851                 GOTO(unlock2, rc);
2852
2853         tla->la_valid = LA_UID | LA_GID;
2854         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2855
2856         GOTO(unlock2, rc);
2857
2858 unlock2:
2859         dt_write_unlock(env, parent);
2860
2861 stop:
2862         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2863
2864 unlock1:
2865         lfsck_layout_unlock(&lh);
2866
2867         return rc;
2868 }
2869
2870 /* If there are more than one MDT-objects claim as the OST-object's parent,
2871  * and the OST-object only recognizes one of them, then we need to generate
2872  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
2873 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
2874                                                    struct lfsck_component *com,
2875                                                    struct lfsck_layout_req *llr,
2876                                                    struct lu_attr *la,
2877                                                    struct lu_buf *buf)
2878 {
2879         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2880         struct dt_allocation_hint       *hint   = &info->lti_hint;
2881         struct dt_object_format         *dof    = &info->lti_dof;
2882         struct dt_device                *pdev   = com->lc_lfsck->li_next;
2883         struct ost_id                   *oi     = &info->lti_oi;
2884         struct dt_object                *parent = llr->llr_parent->llo_obj;
2885         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
2886         struct dt_object                *child  = NULL;
2887         struct lu_device                *d      = &cdev->dd_lu_dev;
2888         struct lu_object                *o      = NULL;
2889         struct thandle                  *handle;
2890         struct lov_mds_md_v1            *lmm;
2891         struct lov_ost_data_v1          *objs;
2892         struct lustre_handle             lh     = { 0 };
2893         __u32                            magic;
2894         int                              rc;
2895         ENTRY;
2896
2897         CDEBUG(D_LFSCK, "Repair multiple references for: parent "DFID
2898                ", OST-index %u, stripe-index %u, owner %u:%u\n",
2899                PFID(lfsck_dto2fid(parent)), llr->llr_ost_idx,
2900                llr->llr_lov_idx, la->la_uid, la->la_gid);
2901
2902         rc = lfsck_layout_lock(env, com, parent, &lh,
2903                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2904         if (rc != 0)
2905                 RETURN(rc);
2906
2907         handle = dt_trans_create(env, pdev);
2908         if (IS_ERR(handle))
2909                 GOTO(unlock1, rc = PTR_ERR(handle));
2910
2911         o = lu_object_anon(env, d, NULL);
2912         if (IS_ERR(o))
2913                 GOTO(stop, rc = PTR_ERR(o));
2914
2915         child = container_of(o, struct dt_object, do_lu);
2916         o = lu_object_locate(o->lo_header, d->ld_type);
2917         if (unlikely(o == NULL))
2918                 GOTO(stop, rc = -EINVAL);
2919
2920         child = container_of(o, struct dt_object, do_lu);
2921         la->la_valid = LA_UID | LA_GID;
2922         hint->dah_parent = NULL;
2923         hint->dah_mode = 0;
2924         dof->dof_type = DFT_REGULAR;
2925         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
2926         if (rc != 0)
2927                 GOTO(stop, rc);
2928
2929         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2930                                   LU_XATTR_REPLACE, handle);
2931         if (rc != 0)
2932                 GOTO(stop, rc);
2933
2934         rc = dt_trans_start(env, pdev, handle);
2935         if (rc != 0)
2936                 GOTO(stop, rc);
2937
2938         dt_write_lock(env, parent, 0);
2939         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2940                 GOTO(unlock2, rc = 0);
2941
2942         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2943         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
2944                 GOTO(unlock2, rc = 0);
2945
2946         lmm = buf->lb_buf;
2947         rc = lfsck_layout_verify_header(lmm);
2948         if (rc != 0)
2949                 GOTO(unlock2, rc);
2950
2951         /* Someone change layout during the LFSCK, no need to repair then. */
2952         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
2953                 GOTO(unlock2, rc = 0);
2954
2955         rc = dt_create(env, child, la, hint, dof, handle);
2956         if (rc != 0)
2957                 GOTO(unlock2, rc);
2958
2959         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2960          * been verified in lfsck_layout_verify_header() already. If some
2961          * new magic introduced in the future, then layout LFSCK needs to
2962          * be updated also. */
2963         magic = le32_to_cpu(lmm->lmm_magic);
2964         if (magic == LOV_MAGIC_V1) {
2965                 objs = &(lmm->lmm_objects[0]);
2966         } else {
2967                 LASSERT(magic == LOV_MAGIC_V3);
2968                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2969         }
2970
2971         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
2972         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
2973         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
2974         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
2975         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
2976         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2977                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
2978
2979         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2980
2981 unlock2:
2982         dt_write_unlock(env, parent);
2983
2984 stop:
2985         if (child != NULL)
2986                 lu_object_put(env, &child->do_lu);
2987
2988         dt_trans_stop(env, pdev, handle);
2989
2990 unlock1:
2991         lfsck_layout_unlock(&lh);
2992
2993         return rc;
2994 }
2995
2996 /* If the MDT-object and the OST-object have different owner information,
2997  * then trust the MDT-object, because the normal chown/chgrp handle order
2998  * is from MDT to OST, and it is possible that some chown/chgrp operation
2999  * is partly done. */
3000 static int lfsck_layout_repair_owner(const struct lu_env *env,
3001                                      struct lfsck_component *com,
3002                                      struct lfsck_layout_req *llr,
3003                                      struct lu_attr *pla)
3004 {
3005         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3006         struct lu_attr                  *tla    = &info->lti_la3;
3007         struct dt_object                *parent = llr->llr_parent->llo_obj;
3008         struct dt_object                *child  = llr->llr_child;
3009         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3010         struct thandle                  *handle;
3011         int                              rc;
3012         ENTRY;
3013
3014         CDEBUG(D_LFSCK, "Repair inconsistent file owner for: parent "DFID
3015                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
3016                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
3017                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
3018
3019         handle = dt_trans_create(env, dev);
3020         if (IS_ERR(handle))
3021                 RETURN(PTR_ERR(handle));
3022
3023         tla->la_uid = pla->la_uid;
3024         tla->la_gid = pla->la_gid;
3025         tla->la_valid = LA_UID | LA_GID;
3026         rc = dt_declare_attr_set(env, child, tla, handle);
3027         if (rc != 0)
3028                 GOTO(stop, rc);
3029
3030         rc = dt_trans_start(env, dev, handle);
3031         if (rc != 0)
3032                 GOTO(stop, rc);
3033
3034         /* Use the dt_object lock to serialize with destroy and attr_set. */
3035         dt_read_lock(env, parent, 0);
3036         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3037                 GOTO(unlock, rc = 1);
3038
3039         /* Get the latest parent's owner. */
3040         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3041         if (rc != 0) {
3042                 CWARN("%s: fail to get the latest parent's ("DFID") owner, "
3043                       "not sure whether some others chown/chgrp during the "
3044                       "LFSCK: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
3045                       PFID(lfsck_dto2fid(parent)), rc);
3046
3047                 GOTO(unlock, rc);
3048         }
3049
3050         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3051         if (unlikely(tla->la_uid != pla->la_uid ||
3052                      tla->la_gid != pla->la_gid))
3053                 GOTO(unlock, rc = 1);
3054
3055         tla->la_valid = LA_UID | LA_GID;
3056         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3057
3058         GOTO(unlock, rc);
3059
3060 unlock:
3061         dt_read_unlock(env, parent);
3062
3063 stop:
3064         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3065
3066         return rc;
3067 }
3068
3069 /* Check whether the OST-object correctly back points to the
3070  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3071 static int lfsck_layout_check_parent(const struct lu_env *env,
3072                                      struct lfsck_component *com,
3073                                      struct dt_object *parent,
3074                                      const struct lu_fid *pfid,
3075                                      const struct lu_fid *cfid,
3076                                      const struct lu_attr *pla,
3077                                      const struct lu_attr *cla,
3078                                      struct lfsck_layout_req *llr,
3079                                      struct lu_buf *lov_ea, __u32 idx)
3080 {
3081         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3082         struct lu_buf                   *buf    = &info->lti_big_buf;
3083         struct dt_object                *tobj;
3084         struct lov_mds_md_v1            *lmm;
3085         struct lov_ost_data_v1          *objs;
3086         int                              rc;
3087         int                              i;
3088         __u32                            magic;
3089         __u16                            count;
3090         ENTRY;
3091
3092         if (fid_is_zero(pfid)) {
3093                 /* client never wrote. */
3094                 if (cla->la_size == 0 && cla->la_blocks == 0) {
3095                         if (unlikely(cla->la_uid != pla->la_uid ||
3096                                      cla->la_gid != pla->la_gid))
3097                                 RETURN (LLIT_INCONSISTENT_OWNER);
3098
3099                         RETURN(0);
3100                 }
3101
3102                 RETURN(LLIT_UNMATCHED_PAIR);
3103         }
3104
3105         if (unlikely(!fid_is_sane(pfid)))
3106                 RETURN(LLIT_UNMATCHED_PAIR);
3107
3108         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3109                 if (llr->llr_lov_idx == idx)
3110                         RETURN(0);
3111
3112                 RETURN(LLIT_UNMATCHED_PAIR);
3113         }
3114
3115         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3116         if (tobj == NULL)
3117                 RETURN(LLIT_UNMATCHED_PAIR);
3118
3119         if (IS_ERR(tobj))
3120                 RETURN(PTR_ERR(tobj));
3121
3122         if (!dt_object_exists(tobj))
3123                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3124
3125         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3126          * remote one on another MDT. Then check whether the given OST-object
3127          * is in such layout. If yes, it is multiple referenced, otherwise it
3128          * is unmatched referenced case. */
3129         rc = lfsck_layout_get_lovea(env, tobj, buf, NULL);
3130         if (rc == 0)
3131                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3132
3133         if (rc < 0)
3134                 GOTO(out, rc);
3135
3136         lmm = buf->lb_buf;
3137         rc = lfsck_layout_verify_header(lmm);
3138         if (rc != 0)
3139                 GOTO(out, rc);
3140
3141         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3142          * been verified in lfsck_layout_verify_header() already. If some
3143          * new magic introduced in the future, then layout LFSCK needs to
3144          * be updated also. */
3145         magic = le32_to_cpu(lmm->lmm_magic);
3146         if (magic == LOV_MAGIC_V1) {
3147                 objs = &(lmm->lmm_objects[0]);
3148         } else {
3149                 LASSERT(magic == LOV_MAGIC_V3);
3150                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3151         }
3152
3153         count = le16_to_cpu(lmm->lmm_stripe_count);
3154         for (i = 0; i < count; i++, objs++) {
3155                 struct lu_fid           *tfid   = &info->lti_fid2;
3156                 struct ost_id           *oi     = &info->lti_oi;
3157
3158                 if (is_dummy_lov_ost_data(objs))
3159                         continue;
3160
3161                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3162                 ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx));
3163                 if (lu_fid_eq(cfid, tfid)) {
3164                         *lov_ea = *buf;
3165
3166                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
3167                 }
3168         }
3169
3170         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3171
3172 out:
3173         lfsck_object_put(env, tobj);
3174
3175         return rc;
3176 }
3177
3178 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
3179                                              struct lfsck_component *com,
3180                                              struct lfsck_layout_req *llr)
3181 {
3182         struct lfsck_layout                  *lo     = com->lc_file_ram;
3183         struct lfsck_thread_info             *info   = lfsck_env_info(env);
3184         struct filter_fid_old                *pea    = &info->lti_old_pfid;
3185         struct lu_fid                        *pfid   = &info->lti_fid;
3186         struct lu_buf                        *buf    = NULL;
3187         struct dt_object                     *parent = llr->llr_parent->llo_obj;
3188         struct dt_object                     *child  = llr->llr_child;
3189         struct lu_attr                       *pla    = &info->lti_la;
3190         struct lu_attr                       *cla    = &info->lti_la2;
3191         struct lfsck_instance                *lfsck  = com->lc_lfsck;
3192         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
3193         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
3194         __u32                                 idx    = 0;
3195         int                                   rc;
3196         ENTRY;
3197
3198         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3199         if (rc != 0) {
3200                 if (lu_object_is_dying(parent->do_lu.lo_header))
3201                         RETURN(0);
3202
3203                 GOTO(out, rc);
3204         }
3205
3206         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3207         if (rc == -ENOENT) {
3208                 if (lu_object_is_dying(parent->do_lu.lo_header))
3209                         RETURN(0);
3210
3211                 type = LLIT_DANGLING;
3212                 goto repair;
3213         }
3214
3215         if (rc != 0)
3216                 GOTO(out, rc);
3217
3218         buf = lfsck_buf_get(env, pea, sizeof(struct filter_fid_old));
3219         rc= dt_xattr_get(env, child, buf, XATTR_NAME_FID, BYPASS_CAPA);
3220         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3221                      rc != sizeof(struct filter_fid))) {
3222                 type = LLIT_UNMATCHED_PAIR;
3223                 goto repair;
3224         }
3225
3226         if (rc < 0 && rc != -ENODATA)
3227                 GOTO(out, rc);
3228
3229         if (rc == -ENODATA) {
3230                 fid_zero(pfid);
3231         } else {
3232                 fid_le_to_cpu(pfid, &pea->ff_parent);
3233                 /* Currently, the filter_fid::ff_parent::f_ver is not the
3234                  * real parent MDT-object's FID::f_ver, instead it is the
3235                  * OST-object index in its parent MDT-object's layout EA. */
3236                 idx = pfid->f_stripe_idx;
3237                 pfid->f_ver = 0;
3238         }
3239
3240         rc = lfsck_layout_check_parent(env, com, parent, pfid,
3241                                        lu_object_fid(&child->do_lu),
3242                                        pla, cla, llr, buf, idx);
3243         if (rc > 0) {
3244                 type = rc;
3245                 goto repair;
3246         }
3247
3248         if (rc < 0)
3249                 GOTO(out, rc);
3250
3251         if (unlikely(cla->la_uid != pla->la_uid ||
3252                      cla->la_gid != pla->la_gid)) {
3253                 type = LLIT_INCONSISTENT_OWNER;
3254                 goto repair;
3255         }
3256
3257 repair:
3258         if (bk->lb_param & LPF_DRYRUN) {
3259                 if (type != LLIT_NONE)
3260                         GOTO(out, rc = 1);
3261                 else
3262                         GOTO(out, rc = 0);
3263         }
3264
3265         switch (type) {
3266         case LLIT_DANGLING:
3267                 memset(cla, 0, sizeof(*cla));
3268                 cla->la_uid = pla->la_uid;
3269                 cla->la_gid = pla->la_gid;
3270                 cla->la_mode = S_IFREG | 0666;
3271                 cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
3272                                 LA_ATIME | LA_MTIME | LA_CTIME;
3273                 rc = lfsck_layout_recreate_ostobj(env, com, llr, cla);
3274                 break;
3275         case LLIT_UNMATCHED_PAIR:
3276                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3277                 break;
3278         case LLIT_MULTIPLE_REFERENCED:
3279                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3280                                                              pla, buf);
3281                 break;
3282         case LLIT_INCONSISTENT_OWNER:
3283                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3284                 break;
3285         default:
3286                 rc = 0;
3287                 break;
3288         }
3289
3290         GOTO(out, rc);
3291
3292 out:
3293         down_write(&com->lc_sem);
3294         if (rc < 0) {
3295                 /* If cannot touch the target server,
3296                  * mark the LFSCK as INCOMPLETE. */
3297                 if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT ||
3298                     rc == -EHOSTDOWN || rc == -EHOSTUNREACH) {
3299                         CERROR("%s: Fail to talk with OST %x: rc = %d.\n",
3300                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3301                         lo->ll_flags |= LF_INCOMPLETE;
3302                         lo->ll_objs_skipped++;
3303                         rc = 0;
3304                 } else {
3305                         lo->ll_objs_failed_phase1++;
3306                 }
3307         } else if (rc > 0) {
3308                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3309                          "unknown type = %d\n", type);
3310
3311                 lo->ll_objs_repaired[type - 1]++;
3312         }
3313         up_write(&com->lc_sem);
3314
3315         return rc;
3316 }
3317
3318 static int lfsck_layout_assistant(void *args)
3319 {
3320         struct lfsck_thread_args        *lta     = args;
3321         struct lu_env                   *env     = &lta->lta_env;
3322         struct lfsck_component          *com     = lta->lta_com;
3323         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
3324         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3325         struct lfsck_position           *pos     = &com->lc_pos_start;
3326         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3327         struct lfsck_request            *lr      = &info->lti_lr;
3328         struct lfsck_layout_master_data *llmd    = com->lc_data;
3329         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3330         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3331         struct lfsck_layout_req         *llr;
3332         struct l_wait_info               lwi     = { 0 };
3333         int                              rc      = 0;
3334         int                              rc1     = 0;
3335         ENTRY;
3336
3337         memset(lr, 0, sizeof(*lr));
3338         lr->lr_event = LE_START;
3339         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3340                        LSV_ASYNC_WINDOWS;
3341         lr->lr_speed = bk->lb_speed_limit;
3342         lr->lr_version = bk->lb_version;
3343         lr->lr_param = bk->lb_param;
3344         lr->lr_async_windows = bk->lb_async_windows;
3345         lr->lr_flags = LEF_TO_OST;
3346         if (pos->lp_oit_cookie <= 1)
3347                 lr->lr_param |= LPF_RESET;
3348
3349         rc = lfsck_layout_master_notify_others(env, com, lr);
3350         if (rc != 0) {
3351                 CERROR("%s: fail to notify others for layout start: rc = %d\n",
3352                        lfsck_lfsck2name(lfsck), rc);
3353                 GOTO(fini, rc);
3354         }
3355
3356         spin_lock(&llmd->llmd_lock);
3357         thread_set_flags(athread, SVC_RUNNING);
3358         spin_unlock(&llmd->llmd_lock);
3359         wake_up_all(&mthread->t_ctl_waitq);
3360
3361         while (1) {
3362                 while (!list_empty(&llmd->llmd_req_list)) {
3363                         bool wakeup = false;
3364
3365                         if (unlikely(llmd->llmd_exit))
3366                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
3367
3368                         llr = list_entry(llmd->llmd_req_list.next,
3369                                          struct lfsck_layout_req,
3370                                          llr_list);
3371                         /* Only the lfsck_layout_assistant thread itself can
3372                          * remove the "llr" from the head of the list, LFSCK
3373                          * engine thread only inserts other new "lld" at the
3374                          * end of the list. So it is safe to handle current
3375                          * "llr" without the spin_lock. */
3376                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
3377                         spin_lock(&llmd->llmd_lock);
3378                         list_del_init(&llr->llr_list);
3379                         llmd->llmd_prefetched--;
3380                         /* Wake up the main engine thread only when the list
3381                          * is empty or half of the prefetched items have been
3382                          * handled to avoid too frequent thread schedule. */
3383                         if (llmd->llmd_prefetched == 0 ||
3384                             (bk->lb_async_windows != 0 &&
3385                              bk->lb_async_windows / 2 ==
3386                              llmd->llmd_prefetched))
3387                                 wakeup = true;
3388                         spin_unlock(&llmd->llmd_lock);
3389                         if (wakeup)
3390                                 wake_up_all(&mthread->t_ctl_waitq);
3391
3392                         lfsck_layout_req_fini(env, llr);
3393                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3394                                 GOTO(cleanup1, rc);
3395                 }
3396
3397                 l_wait_event(athread->t_ctl_waitq,
3398                              !lfsck_layout_req_empty(llmd) ||
3399                              llmd->llmd_exit ||
3400                              llmd->llmd_to_post ||
3401                              llmd->llmd_to_double_scan,
3402                              &lwi);
3403
3404                 if (unlikely(llmd->llmd_exit))
3405                         GOTO(cleanup1, rc = llmd->llmd_post_result);
3406
3407                 if (!list_empty(&llmd->llmd_req_list))
3408                         continue;
3409
3410                 if (llmd->llmd_to_post) {
3411                         llmd->llmd_to_post = 0;
3412                         LASSERT(llmd->llmd_post_result > 0);
3413
3414                         memset(lr, 0, sizeof(*lr));
3415                         lr->lr_event = LE_PHASE1_DONE;
3416                         lr->lr_status = llmd->llmd_post_result;
3417                         rc = lfsck_layout_master_notify_others(env, com, lr);
3418                         if (rc != 0)
3419                                 CERROR("%s: failed to notify others "
3420                                        "for layout post: rc = %d\n",
3421                                        lfsck_lfsck2name(lfsck), rc);
3422
3423                         /* Wakeup the master engine to go ahead. */
3424                         wake_up_all(&mthread->t_ctl_waitq);
3425                 }
3426
3427                 if (llmd->llmd_to_double_scan) {
3428                         llmd->llmd_to_double_scan = 0;
3429                         atomic_inc(&lfsck->li_double_scan_count);
3430                         llmd->llmd_in_double_scan = 1;
3431                         wake_up_all(&mthread->t_ctl_waitq);
3432
3433                         com->lc_new_checked = 0;
3434                         com->lc_new_scanned = 0;
3435                         com->lc_time_last_checkpoint = cfs_time_current();
3436                         com->lc_time_next_checkpoint =
3437                                 com->lc_time_last_checkpoint +
3438                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3439
3440                         while (llmd->llmd_in_double_scan) {
3441                                 struct lfsck_tgt_descs  *ltds =
3442                                                         &lfsck->li_ost_descs;
3443                                 struct lfsck_tgt_desc   *ltd;
3444
3445                                 rc = lfsck_layout_master_query_others(env, com);
3446                                 if (lfsck_layout_master_to_orphan(llmd))
3447                                         goto orphan;
3448
3449                                 if (rc < 0)
3450                                         GOTO(cleanup2, rc);
3451
3452                                 /* Pull LFSCK status on related targets once
3453                                  * per 30 seconds if we are not notified. */
3454                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
3455                                                            cfs_time_seconds(1),
3456                                                            NULL, NULL);
3457                                 rc = l_wait_event(athread->t_ctl_waitq,
3458                                         lfsck_layout_master_to_orphan(llmd) ||
3459                                         llmd->llmd_exit ||
3460                                         !thread_is_running(mthread),
3461                                         &lwi);
3462
3463                                 if (unlikely(llmd->llmd_exit ||
3464                                              !thread_is_running(mthread)))
3465                                         GOTO(cleanup2, rc = 0);
3466
3467                                 if (rc == -ETIMEDOUT)
3468                                         continue;
3469
3470                                 if (rc < 0)
3471                                         GOTO(cleanup2, rc);
3472
3473 orphan:
3474                                 spin_lock(&ltds->ltd_lock);
3475                                 while (!list_empty(
3476                                                 &llmd->llmd_ost_phase2_list)) {
3477                                         ltd = list_entry(
3478                                               llmd->llmd_ost_phase2_list.next,
3479                                               struct lfsck_tgt_desc,
3480                                               ltd_layout_phase_list);
3481                                         list_del_init(
3482                                                 &ltd->ltd_layout_phase_list);
3483                                         spin_unlock(&ltds->ltd_lock);
3484
3485                                         if (bk->lb_param & LPF_ALL_TGT) {
3486                                                 rc = lfsck_layout_scan_orphan(
3487                                                                 env, com, ltd);
3488                                                 if (rc != 0 &&
3489                                                     bk->lb_param & LPF_FAILOUT)
3490                                                         GOTO(cleanup2, rc);
3491                                         }
3492
3493                                         if (unlikely(llmd->llmd_exit ||
3494                                                 !thread_is_running(mthread)))
3495                                                 GOTO(cleanup2, rc = 0);
3496
3497                                         spin_lock(&ltds->ltd_lock);
3498                                 }
3499
3500                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
3501                                         spin_unlock(&ltds->ltd_lock);
3502                                         GOTO(cleanup2, rc = 1);
3503                                 }
3504                                 spin_unlock(&ltds->ltd_lock);
3505                         }
3506                 }
3507         }
3508
3509 cleanup1:
3510         /* Cleanup the unfinished requests. */
3511         spin_lock(&llmd->llmd_lock);
3512         if (rc < 0)
3513                 llmd->llmd_assistant_status = rc;
3514
3515         while (!list_empty(&llmd->llmd_req_list)) {
3516                 llr = list_entry(llmd->llmd_req_list.next,
3517                                  struct lfsck_layout_req,
3518                                  llr_list);
3519                 list_del_init(&llr->llr_list);
3520                 llmd->llmd_prefetched--;
3521                 spin_unlock(&llmd->llmd_lock);
3522                 lfsck_layout_req_fini(env, llr);
3523                 spin_lock(&llmd->llmd_lock);
3524         }
3525         spin_unlock(&llmd->llmd_lock);
3526
3527         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
3528                  llmd->llmd_prefetched);
3529
3530 cleanup2:
3531         memset(lr, 0, sizeof(*lr));
3532         if (rc > 0) {
3533                 lr->lr_event = LE_PHASE2_DONE;
3534                 lr->lr_status = rc;
3535         } else if (rc == 0) {
3536                 if (lfsck->li_flags & LPF_ALL_TGT) {
3537                         lr->lr_event = LE_STOP;
3538                         lr->lr_status = LS_STOPPED;
3539                 } else {
3540                         lr->lr_event = LE_PEER_EXIT;
3541                         switch (lfsck->li_status) {
3542                         case LS_PAUSED:
3543                         case LS_CO_PAUSED:
3544                                 lr->lr_status = LS_CO_PAUSED;
3545                                 break;
3546                         case LS_STOPPED:
3547                         case LS_CO_STOPPED:
3548                                 lr->lr_status = LS_CO_STOPPED;
3549                                 break;
3550                         default:
3551                                 CERROR("%s: unknown status: rc = %d\n",
3552                                        lfsck_lfsck2name(lfsck),
3553                                        lfsck->li_status);
3554                                 lr->lr_status = LS_CO_FAILED;
3555                                 break;
3556                         }
3557                 }
3558         } else {
3559                 if (lfsck->li_flags & LPF_ALL_TGT) {
3560                         lr->lr_event = LE_STOP;
3561                         lr->lr_status = LS_FAILED;
3562                 } else {
3563                         lr->lr_event = LE_PEER_EXIT;
3564                         lr->lr_status = LS_CO_FAILED;
3565                 }
3566         }
3567
3568         rc1 = lfsck_layout_master_notify_others(env, com, lr);
3569         if (rc1 != 0) {
3570                 CERROR("%s: failed to notify others for layout quit: rc = %d\n",
3571                        lfsck_lfsck2name(lfsck), rc1);
3572                 rc = rc1;
3573         }
3574
3575         /* Under force exit case, some requests may be just freed without
3576          * verification, those objects should be re-handled when next run.
3577          * So not update the on-disk tracing file under such case. */
3578         if (!llmd->llmd_exit)
3579                 rc1 = lfsck_layout_double_scan_result(env, com, rc);
3580
3581 fini:
3582         if (llmd->llmd_in_double_scan)
3583                 atomic_dec(&lfsck->li_double_scan_count);
3584
3585         spin_lock(&llmd->llmd_lock);
3586         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
3587         thread_set_flags(athread, SVC_STOPPED);
3588         wake_up_all(&mthread->t_ctl_waitq);
3589         spin_unlock(&llmd->llmd_lock);
3590         lfsck_thread_args_fini(lta);
3591
3592         return rc;
3593 }
3594
3595 static int
3596 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3597                                    struct ptlrpc_request *req,
3598                                    void *args, int rc)
3599 {
3600         struct lfsck_layout_slave_async_args *llsaa = args;
3601         struct obd_export                    *exp   = llsaa->llsaa_exp;
3602         struct lfsck_component               *com   = llsaa->llsaa_com;
3603         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3604         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3605         bool                                  done  = false;
3606
3607         if (rc != 0) {
3608                 /* It is quite probably caused by target crash,
3609                  * to make the LFSCK can go ahead, assume that
3610                  * the target finished the LFSCK prcoessing. */
3611                 done = true;
3612         } else {
3613                 struct lfsck_reply *lr;
3614
3615                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3616                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3617                     lr->lr_status != LS_SCANNING_PHASE2)
3618                         done = true;
3619         }
3620         if (done)
3621                 lfsck_layout_llst_del(llsd, llst);
3622         lfsck_layout_llst_put(llst);
3623         lfsck_component_put(env, com);
3624         class_export_put(exp);
3625
3626         return 0;
3627 }
3628
3629 static int lfsck_layout_async_query(const struct lu_env *env,
3630                                     struct lfsck_component *com,
3631                                     struct obd_export *exp,
3632                                     struct lfsck_layout_slave_target *llst,
3633                                     struct lfsck_request *lr,
3634                                     struct ptlrpc_request_set *set)
3635 {
3636         struct lfsck_layout_slave_async_args *llsaa;
3637         struct ptlrpc_request                *req;
3638         struct lfsck_request                 *tmp;
3639         int                                   rc;
3640         ENTRY;
3641
3642         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3643         if (req == NULL)
3644                 RETURN(-ENOMEM);
3645
3646         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3647         if (rc != 0) {
3648                 ptlrpc_request_free(req);
3649                 RETURN(rc);
3650         }
3651
3652         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3653         *tmp = *lr;
3654         ptlrpc_request_set_replen(req);
3655
3656         llsaa = ptlrpc_req_async_args(req);
3657         llsaa->llsaa_exp = exp;
3658         llsaa->llsaa_com = lfsck_component_get(com);
3659         llsaa->llsaa_llst = llst;
3660         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3661         ptlrpc_set_add_req(set, req);
3662
3663         RETURN(0);
3664 }
3665
3666 static int lfsck_layout_async_notify(const struct lu_env *env,
3667                                      struct obd_export *exp,
3668                                      struct lfsck_request *lr,
3669                                      struct ptlrpc_request_set *set)
3670 {
3671         struct ptlrpc_request   *req;
3672         struct lfsck_request    *tmp;
3673         int                      rc;
3674         ENTRY;
3675
3676         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3677         if (req == NULL)
3678                 RETURN(-ENOMEM);
3679
3680         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3681         if (rc != 0) {
3682                 ptlrpc_request_free(req);
3683                 RETURN(rc);
3684         }
3685
3686         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3687         *tmp = *lr;
3688         ptlrpc_request_set_replen(req);
3689         ptlrpc_set_add_req(set, req);
3690
3691         RETURN(0);
3692 }
3693
3694 static int
3695 lfsck_layout_slave_query_master(const struct lu_env *env,
3696                                 struct lfsck_component *com)
3697 {
3698         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3699         struct lfsck_instance            *lfsck = com->lc_lfsck;
3700         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3701         struct lfsck_layout_slave_target *llst;
3702         struct obd_export                *exp;
3703         struct ptlrpc_request_set        *set;
3704         int                               rc    = 0;
3705         int                               rc1   = 0;
3706         ENTRY;
3707
3708         set = ptlrpc_prep_set();
3709         if (set == NULL)
3710                 RETURN(-ENOMEM);
3711
3712         memset(lr, 0, sizeof(*lr));
3713         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3714         lr->lr_event = LE_QUERY;
3715         lr->lr_active = LT_LAYOUT;
3716
3717         llsd->llsd_touch_gen++;
3718         spin_lock(&llsd->llsd_lock);
3719         while (!list_empty(&llsd->llsd_master_list)) {
3720                 llst = list_entry(llsd->llsd_master_list.next,
3721                                   struct lfsck_layout_slave_target,
3722                                   llst_list);
3723                 if (llst->llst_gen == llsd->llsd_touch_gen)
3724                         break;
3725
3726                 llst->llst_gen = llsd->llsd_touch_gen;
3727                 list_del(&llst->llst_list);
3728                 list_add_tail(&llst->llst_list,
3729                               &llsd->llsd_master_list);
3730                 atomic_inc(&llst->llst_ref);
3731                 spin_unlock(&llsd->llsd_lock);
3732
3733                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3734                                                llst->llst_index);
3735                 if (exp == NULL) {
3736                         lfsck_layout_llst_del(llsd, llst);
3737                         lfsck_layout_llst_put(llst);
3738                         spin_lock(&llsd->llsd_lock);
3739                         continue;
3740                 }
3741
3742                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3743                 if (rc != 0) {
3744                         CERROR("%s: slave fail to query %s for layout: "
3745                                "rc = %d\n", lfsck_lfsck2name(lfsck),
3746                                exp->exp_obd->obd_name, rc);
3747                         rc1 = rc;
3748                         lfsck_layout_llst_put(llst);
3749                         class_export_put(exp);
3750                 }
3751                 spin_lock(&llsd->llsd_lock);
3752         }
3753         spin_unlock(&llsd->llsd_lock);
3754
3755         rc = ptlrpc_set_wait(set);
3756         ptlrpc_set_destroy(set);
3757
3758         RETURN(rc1 != 0 ? rc1 : rc);
3759 }
3760
3761 static void
3762 lfsck_layout_slave_notify_master(const struct lu_env *env,
3763                                  struct lfsck_component *com,
3764                                  enum lfsck_events event, int result)
3765 {
3766         struct lfsck_instance            *lfsck = com->lc_lfsck;
3767         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3768         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3769         struct lfsck_layout_slave_target *llst;
3770         struct obd_export                *exp;
3771         struct ptlrpc_request_set        *set;
3772         int                               rc;
3773         ENTRY;
3774
3775         set = ptlrpc_prep_set();
3776         if (set == NULL)
3777                 RETURN_EXIT;
3778
3779         memset(lr, 0, sizeof(*lr));
3780         lr->lr_event = event;
3781         lr->lr_flags = LEF_FROM_OST;
3782         lr->lr_status = result;
3783         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3784         lr->lr_active = LT_LAYOUT;
3785         llsd->llsd_touch_gen++;
3786         spin_lock(&llsd->llsd_lock);
3787         while (!list_empty(&llsd->llsd_master_list)) {
3788                 llst = list_entry(llsd->llsd_master_list.next,
3789                                   struct lfsck_layout_slave_target,
3790                                   llst_list);
3791                 if (llst->llst_gen == llsd->llsd_touch_gen)
3792                         break;
3793
3794                 llst->llst_gen = llsd->llsd_touch_gen;
3795                 list_del(&llst->llst_list);
3796                 list_add_tail(&llst->llst_list,
3797                               &llsd->llsd_master_list);
3798                 atomic_inc(&llst->llst_ref);
3799                 spin_unlock(&llsd->llsd_lock);
3800
3801                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3802                                                llst->llst_index);
3803                 if (exp == NULL) {
3804                         lfsck_layout_llst_del(llsd, llst);
3805                         lfsck_layout_llst_put(llst);
3806                         spin_lock(&llsd->llsd_lock);
3807                         continue;
3808                 }
3809
3810                 rc = lfsck_layout_async_notify(env, exp, lr, set);
3811                 if (rc != 0)
3812                         CERROR("%s: slave fail to notify %s for layout: "
3813                                "rc = %d\n", lfsck_lfsck2name(lfsck),
3814                                exp->exp_obd->obd_name, rc);
3815                 lfsck_layout_llst_put(llst);
3816                 class_export_put(exp);
3817                 spin_lock(&llsd->llsd_lock);
3818         }
3819         spin_unlock(&llsd->llsd_lock);
3820
3821         ptlrpc_set_wait(set);
3822         ptlrpc_set_destroy(set);
3823
3824         RETURN_EXIT;
3825 }
3826
3827 /*
3828  * \ret -ENODATA: unrecognized stripe
3829  * \ret = 0     : recognized stripe
3830  * \ret < 0     : other failures
3831  */
3832 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
3833                                            struct lfsck_component *com,
3834                                            struct lu_fid *cfid,
3835                                            struct lu_fid *pfid)
3836 {
3837         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3838         struct lu_buf                   *buf    = &info->lti_big_buf;
3839         struct ost_id                   *oi     = &info->lti_oi;
3840         struct dt_object                *obj;
3841         struct lov_mds_md_v1            *lmm;
3842         struct lov_ost_data_v1          *objs;
3843         __u32                            idx    = pfid->f_stripe_idx;
3844         __u32                            magic;
3845         int                              rc     = 0;
3846         int                              i;
3847         __u16                            count;
3848         ENTRY;
3849
3850         pfid->f_ver = 0;
3851         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
3852         if (IS_ERR(obj))
3853                 RETURN(PTR_ERR(obj));
3854
3855         dt_read_lock(env, obj, 0);
3856         if (unlikely(!dt_object_exists(obj)))
3857                 GOTO(unlock, rc = -ENOENT);
3858
3859         rc = lfsck_layout_get_lovea(env, obj, buf, NULL);
3860         if (rc < 0)
3861                 GOTO(unlock, rc);
3862
3863         if (rc == 0)
3864                 GOTO(unlock, rc = -ENODATA);
3865
3866         lmm = buf->lb_buf;
3867         rc = lfsck_layout_verify_header(lmm);
3868         if (rc != 0)
3869                 GOTO(unlock, rc);
3870
3871         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3872          * been verified in lfsck_layout_verify_header() already. If some
3873          * new magic introduced in the future, then layout LFSCK needs to
3874          * be updated also. */
3875         magic = le32_to_cpu(lmm->lmm_magic);
3876         if (magic == LOV_MAGIC_V1) {
3877                 objs = &(lmm->lmm_objects[0]);
3878         } else {
3879                 LASSERT(magic == LOV_MAGIC_V3);
3880                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3881         }
3882
3883         fid_to_ostid(cfid, oi);
3884         count = le16_to_cpu(lmm->lmm_stripe_count);
3885         for (i = 0; i < count; i++, objs++) {
3886                 struct ost_id oi2;
3887
3888                 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
3889                 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
3890                         GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
3891         }
3892
3893         GOTO(unlock, rc = -ENODATA);
3894
3895 unlock:
3896         dt_read_unlock(env, obj);
3897         lu_object_put(env, &obj->do_lu);
3898
3899         return rc;
3900 }
3901
3902 /*
3903  * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
3904  * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
3905  * layout EA from MDT to OST. On one hand, the OST no need to understand
3906  * the layout EA structure; on the other hand, it may cause trouble when
3907  * transfer large layout EA from MDT to OST via normal OUT RPC.
3908  *
3909  * \ret > 0: unrecognized stripe
3910  * \ret = 0: recognized stripe
3911  * \ret < 0: other failures
3912  */
3913 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
3914                                           struct lfsck_component *com,
3915                                           struct lu_fid *cfid,
3916                                           struct lu_fid *pfid)
3917 {
3918         struct lfsck_instance    *lfsck  = com->lc_lfsck;
3919         struct obd_device        *obd    = lfsck->li_obd;
3920         struct seq_server_site   *ss     =
3921                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
3922         struct obd_export        *exp    = NULL;
3923         struct ptlrpc_request    *req    = NULL;
3924         struct lfsck_request     *lr;
3925         struct lu_seq_range       range  = { 0 };
3926         int                       rc     = 0;
3927         ENTRY;
3928
3929         if (unlikely(fid_is_idif(pfid)))
3930                 RETURN(1);
3931
3932         fld_range_set_any(&range);
3933         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
3934         if (rc != 0)
3935                 RETURN(rc == -ENOENT ? 1 : rc);
3936
3937         if (unlikely(!fld_range_is_mdt(&range)))
3938                 RETURN(1);
3939
3940         exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
3941         if (unlikely(exp == NULL))
3942                 RETURN(1);
3943
3944         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
3945                 GOTO(out, rc = -EOPNOTSUPP);
3946
3947         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3948         if (req == NULL)
3949                 GOTO(out, rc = -ENOMEM);
3950
3951         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3952         if (rc != 0) {
3953                 ptlrpc_request_free(req);
3954
3955                 GOTO(out, rc);
3956         }
3957
3958         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3959         memset(lr, 0, sizeof(*lr));
3960         lr->lr_event = LE_PAIRS_VERIFY;
3961         lr->lr_active = LT_LAYOUT;
3962         lr->lr_fid = *cfid; /* OST-object itself FID. */
3963         lr->lr_fid2 = *pfid; /* The claimed parent FID. */
3964
3965         ptlrpc_request_set_replen(req);
3966         rc = ptlrpc_queue_wait(req);
3967         ptlrpc_req_finished(req);
3968
3969         if (rc == -ENOENT || rc == -ENODATA)
3970                 rc = 1;
3971
3972         GOTO(out, rc);
3973
3974 out:
3975         if (exp != NULL)
3976                 class_export_put(exp);
3977
3978         return rc;
3979 }
3980
3981 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
3982                                           struct lfsck_component *com,
3983                                           struct lfsck_request *lr)
3984 {
3985         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3986         struct filter_fid               *ff     = &info->lti_new_pfid;
3987         struct lu_buf                   *buf;
3988         struct dt_device                *dev    = com->lc_lfsck->li_bottom;
3989         struct dt_object                *obj;
3990         struct thandle                  *th     = NULL;
3991         int                              rc     = 0;
3992         ENTRY;
3993
3994         obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
3995         if (IS_ERR(obj))
3996                 RETURN(PTR_ERR(obj));
3997
3998         fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
3999         buf = lfsck_buf_get(env, ff, sizeof(*ff));
4000         dt_write_lock(env, obj, 0);
4001         if (unlikely(!dt_object_exists(obj)))
4002                 GOTO(unlock, rc = 0);
4003
4004         th = dt_trans_create(env, dev);
4005         if (IS_ERR(th))
4006                 GOTO(unlock, rc = PTR_ERR(th));
4007
4008         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
4009         if (rc != 0)
4010                 GOTO(stop, rc);
4011
4012         rc = dt_trans_start_local(env, dev, th);
4013         if (rc != 0)
4014                 GOTO(stop, rc);
4015
4016         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
4017
4018         GOTO(stop, rc);
4019
4020 stop:
4021         dt_trans_stop(env, dev, th);
4022
4023 unlock:
4024         dt_write_unlock(env, obj);
4025         lu_object_put(env, &obj->do_lu);
4026
4027         return rc;
4028 }
4029
4030 /* layout APIs */
4031
4032 static int lfsck_layout_reset(const struct lu_env *env,
4033                               struct lfsck_component *com, bool init)
4034 {
4035         struct lfsck_layout     *lo    = com->lc_file_ram;
4036         int                      rc;
4037
4038         down_write(&com->lc_sem);
4039         if (init) {
4040                 memset(lo, 0, com->lc_file_size);
4041         } else {
4042                 __u32 count = lo->ll_success_count;
4043                 __u64 last_time = lo->ll_time_last_complete;
4044
4045                 memset(lo, 0, com->lc_file_size);
4046                 lo->ll_success_count = count;
4047                 lo->ll_time_last_complete = last_time;
4048         }
4049
4050         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
4051         lo->ll_status = LS_INIT;
4052
4053         rc = lfsck_layout_store(env, com);
4054         up_write(&com->lc_sem);
4055
4056         return rc;
4057 }
4058
4059 static void lfsck_layout_fail(const struct lu_env *env,
4060                               struct lfsck_component *com, bool new_checked)
4061 {
4062         struct lfsck_layout *lo = com->lc_file_ram;
4063
4064         down_write(&com->lc_sem);
4065         if (new_checked)
4066                 com->lc_new_checked++;
4067         lo->ll_objs_failed_phase1++;
4068         if (lo->ll_pos_first_inconsistent == 0) {
4069                 struct lfsck_instance *lfsck = com->lc_lfsck;
4070
4071                 lo->ll_pos_first_inconsistent =
4072                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
4073                                                         lfsck->li_di_oit);
4074         }
4075         up_write(&com->lc_sem);
4076 }
4077
4078 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
4079                                           struct lfsck_component *com, bool init)
4080 {
4081         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4082         struct lfsck_layout             *lo      = com->lc_file_ram;
4083         struct lfsck_layout_master_data *llmd    = com->lc_data;
4084         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4085         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4086         struct l_wait_info               lwi     = { 0 };
4087         int                              rc;
4088
4089         if (com->lc_new_checked == 0 && !init)
4090                 return 0;
4091
4092         l_wait_event(mthread->t_ctl_waitq,
4093                      list_empty(&llmd->llmd_req_list) ||
4094                      !thread_is_running(mthread) ||
4095                      thread_is_stopped(athread),
4096                      &lwi);
4097
4098         if (!thread_is_running(mthread) || thread_is_stopped(athread))
4099                 return 0;
4100
4101         down_write(&com->lc_sem);
4102         if (init) {
4103                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4104         } else {
4105                 lo->ll_pos_last_checkpoint =
4106                                         lfsck->li_pos_current.lp_oit_cookie;
4107                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4108                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4109                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4110                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4111                 com->lc_new_checked = 0;
4112         }
4113
4114         rc = lfsck_layout_store(env, com);
4115         up_write(&com->lc_sem);
4116
4117         return rc;
4118 }
4119
4120 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
4121                                          struct lfsck_component *com, bool init)
4122 {
4123         struct lfsck_instance   *lfsck = com->lc_lfsck;
4124         struct lfsck_layout     *lo    = com->lc_file_ram;
4125         int                      rc;
4126
4127         if (com->lc_new_checked == 0 && !init)
4128                 return 0;
4129
4130         down_write(&com->lc_sem);
4131
4132         if (init) {
4133                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4134         } else {
4135                 lo->ll_pos_last_checkpoint =
4136                                         lfsck->li_pos_current.lp_oit_cookie;
4137                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4138                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4139                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4140                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4141                 com->lc_new_checked = 0;
4142         }
4143
4144         rc = lfsck_layout_store(env, com);
4145
4146         up_write(&com->lc_sem);
4147
4148         return rc;
4149 }
4150
4151 static int lfsck_layout_prep(const struct lu_env *env,
4152                              struct lfsck_component *com,
4153                              struct lfsck_start *start)
4154 {
4155         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4156         struct lfsck_layout     *lo     = com->lc_file_ram;
4157         struct lfsck_position   *pos    = &com->lc_pos_start;
4158
4159         fid_zero(&pos->lp_dir_parent);
4160         pos->lp_dir_cookie = 0;
4161         if (lo->ll_status == LS_COMPLETED ||
4162             lo->ll_status == LS_PARTIAL ||
4163             /* To handle orphan, must scan from the beginning. */
4164             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
4165                 int rc;
4166
4167                 rc = lfsck_layout_reset(env, com, false);
4168                 if (rc != 0)
4169                         return rc;
4170         }
4171
4172         down_write(&com->lc_sem);
4173         lo->ll_time_latest_start = cfs_time_current_sec();
4174         spin_lock(&lfsck->li_lock);
4175         if (lo->ll_flags & LF_SCANNED_ONCE) {
4176                 if (!lfsck->li_drop_dryrun ||
4177                     lo->ll_pos_first_inconsistent == 0) {
4178                         lo->ll_status = LS_SCANNING_PHASE2;
4179                         list_del_init(&com->lc_link);
4180                         list_add_tail(&com->lc_link,
4181                                       &lfsck->li_list_double_scan);
4182                         pos->lp_oit_cookie = 0;
4183                 } else {
4184                         int i;
4185
4186                         lo->ll_status = LS_SCANNING_PHASE1;
4187                         lo->ll_run_time_phase1 = 0;
4188                         lo->ll_run_time_phase2 = 0;
4189                         lo->ll_objs_checked_phase1 = 0;
4190                         lo->ll_objs_checked_phase2 = 0;
4191                         lo->ll_objs_failed_phase1 = 0;
4192                         lo->ll_objs_failed_phase2 = 0;
4193                         for (i = 0; i < LLIT_MAX; i++)
4194                                 lo->ll_objs_repaired[i] = 0;
4195
4196                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4197                         fid_zero(&com->lc_fid_latest_scanned_phase2);
4198                 }
4199         } else {
4200                 lo->ll_status = LS_SCANNING_PHASE1;
4201                 if (!lfsck->li_drop_dryrun ||
4202                     lo->ll_pos_first_inconsistent == 0)
4203                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4204                 else
4205                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4206         }
4207         spin_unlock(&lfsck->li_lock);
4208         up_write(&com->lc_sem);
4209
4210         return 0;
4211 }
4212
4213 static int lfsck_layout_slave_prep(const struct lu_env *env,
4214                                    struct lfsck_component *com,
4215                                    struct lfsck_start_param *lsp)
4216 {
4217         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4218         struct lfsck_start              *start  = lsp->lsp_start;
4219         int                              rc;
4220
4221         rc = lfsck_layout_prep(env, com, start);
4222         if (rc != 0 || !lsp->lsp_index_valid)
4223                 return rc;
4224
4225         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4226         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
4227                 LASSERT(!llsd->llsd_rbtree_valid);
4228
4229                 write_lock(&llsd->llsd_rb_lock);
4230                 rc = lfsck_rbtree_setup(env, com);
4231                 write_unlock(&llsd->llsd_rb_lock);
4232         }
4233
4234         return rc;
4235 }
4236
4237 static int lfsck_layout_master_prep(const struct lu_env *env,
4238                                     struct lfsck_component *com,
4239                                     struct lfsck_start_param *lsp)
4240 {
4241         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4242         struct lfsck_layout_master_data *llmd    = com->lc_data;
4243         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4244         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4245         struct lfsck_thread_args        *lta;
4246         long                             rc;
4247         ENTRY;
4248
4249         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4250         if (rc != 0)
4251                 RETURN(rc);
4252
4253         llmd->llmd_assistant_status = 0;
4254         llmd->llmd_post_result = 0;
4255         llmd->llmd_to_post = 0;
4256         llmd->llmd_to_double_scan = 0;
4257         llmd->llmd_in_double_scan = 0;
4258         llmd->llmd_exit = 0;
4259         thread_set_flags(athread, 0);
4260
4261         lta = lfsck_thread_args_init(lfsck, com, lsp);
4262         if (IS_ERR(lta))
4263                 RETURN(PTR_ERR(lta));
4264
4265         rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"));
4266         if (IS_ERR_VALUE(rc)) {
4267                 CERROR("%s: Cannot start LFSCK layout assistant thread: "
4268                        "rc = %ld\n", lfsck_lfsck2name(lfsck), rc);
4269                 lfsck_thread_args_fini(lta);
4270         } else {
4271                 struct l_wait_info lwi = { 0 };
4272
4273                 l_wait_event(mthread->t_ctl_waitq,
4274                              thread_is_running(athread) ||
4275                              thread_is_stopped(athread),
4276                              &lwi);
4277                 if (unlikely(!thread_is_running(athread)))
4278                         rc = llmd->llmd_assistant_status;
4279                 else
4280                         rc = 0;
4281         }
4282
4283         RETURN(rc);
4284 }
4285
4286 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4287 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4288                                      struct lfsck_component *com,
4289                                      struct dt_object *parent,
4290                                      struct lov_mds_md_v1 *lmm)
4291 {
4292         struct lfsck_thread_info        *info    = lfsck_env_info(env);
4293         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4294         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
4295         struct lfsck_layout             *lo      = com->lc_file_ram;
4296         struct lfsck_layout_master_data *llmd    = com->lc_data;
4297         struct lfsck_layout_object      *llo     = NULL;
4298         struct lov_ost_data_v1          *objs;
4299         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
4300         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4301         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4302                 struct l_wait_info       lwi     = { 0 };
4303         struct lu_buf                   *buf;
4304         int                              rc      = 0;
4305         int                              i;
4306         __u32                            magic;
4307         __u16                            count;
4308         __u16                            gen;
4309         ENTRY;
4310
4311         buf = lfsck_buf_get(env, &info->lti_old_pfid,
4312                             sizeof(struct filter_fid_old));
4313         count = le16_to_cpu(lmm->lmm_stripe_count);
4314         gen = le16_to_cpu(lmm->lmm_layout_gen);
4315         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4316          * been verified in lfsck_layout_verify_header() already. If some
4317          * new magic introduced in the future, then layout LFSCK needs to
4318          * be updated also. */
4319         magic = le32_to_cpu(lmm->lmm_magic);
4320         if (magic == LOV_MAGIC_V1) {
4321                 objs = &(lmm->lmm_objects[0]);
4322         } else {
4323                 LASSERT(magic == LOV_MAGIC_V3);
4324                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4325         }
4326
4327         for (i = 0; i < count; i++, objs++) {
4328                 struct lu_fid           *fid    = &info->lti_fid;
4329                 struct ost_id           *oi     = &info->lti_oi;
4330                 struct lfsck_layout_req *llr;
4331                 struct lfsck_tgt_desc   *tgt    = NULL;
4332                 struct dt_object        *cobj   = NULL;
4333                 __u32                    index  =
4334                                         le32_to_cpu(objs->l_ost_idx);
4335                 bool                     wakeup = false;
4336
4337                 if (is_dummy_lov_ost_data(objs))
4338                         continue;
4339
4340                 l_wait_event(mthread->t_ctl_waitq,
4341                              bk->lb_async_windows == 0 ||
4342                              llmd->llmd_prefetched < bk->lb_async_windows ||
4343                              !thread_is_running(mthread) ||
4344                              thread_is_stopped(athread),
4345                              &lwi);
4346
4347                 if (unlikely(!thread_is_running(mthread)) ||
4348                              thread_is_stopped(athread))
4349                         GOTO(out, rc = 0);
4350
4351                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4352                 ostid_to_fid(fid, oi, index);
4353                 tgt = lfsck_tgt_get(ltds, index);
4354                 if (unlikely(tgt == NULL)) {
4355                         CERROR("%s: Cannot talk with OST %x which did not join "
4356                                "the layout LFSCK.\n",
4357                                lfsck_lfsck2name(lfsck), index);
4358                         lo->ll_flags |= LF_INCOMPLETE;
4359                         goto next;
4360                 }
4361
4362                 cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
4363                 if (IS_ERR(cobj)) {
4364                         rc = PTR_ERR(cobj);
4365                         goto next;
4366                 }
4367
4368                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4369                 if (rc != 0)
4370                         goto next;
4371
4372                 rc = dt_declare_xattr_get(env, cobj, buf, XATTR_NAME_FID,
4373                                           BYPASS_CAPA);
4374                 if (rc != 0)
4375                         goto next;
4376
4377                 if (llo == NULL) {
4378                         llo = lfsck_layout_object_init(env, parent, gen);
4379                         if (IS_ERR(llo)) {
4380                                 rc = PTR_ERR(llo);
4381                                 goto next;
4382                         }
4383                 }
4384
4385                 llr = lfsck_layout_req_init(llo, cobj, index, i);
4386                 if (IS_ERR(llr)) {
4387                         rc = PTR_ERR(llr);
4388                         goto next;
4389                 }
4390
4391                 cobj = NULL;
4392                 spin_lock(&llmd->llmd_lock);
4393                 if (llmd->llmd_assistant_status < 0) {
4394                         spin_unlock(&llmd->llmd_lock);
4395                         lfsck_layout_req_fini(env, llr);
4396                         lfsck_tgt_put(tgt);
4397                         RETURN(llmd->llmd_assistant_status);
4398                 }
4399
4400                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
4401                 if (llmd->llmd_prefetched == 0)
4402                         wakeup = true;
4403
4404                 llmd->llmd_prefetched++;
4405                 spin_unlock(&llmd->llmd_lock);
4406                 if (wakeup)
4407                         wake_up_all(&athread->t_ctl_waitq);
4408
4409 next:
4410                 down_write(&com->lc_sem);
4411                 com->lc_new_checked++;
4412                 if (rc < 0)
4413                         lo->ll_objs_failed_phase1++;
4414                 up_write(&com->lc_sem);
4415
4416                 if (cobj != NULL && !IS_ERR(cobj))
4417                         lu_object_put(env, &cobj->do_lu);
4418
4419                 if (likely(tgt != NULL))
4420                         lfsck_tgt_put(tgt);
4421
4422                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4423                         GOTO(out, rc);
4424         }
4425
4426         GOTO(out, rc = 0);
4427
4428 out:
4429         if (llo != NULL && !IS_ERR(llo))
4430                 lfsck_layout_object_put(env, llo);
4431
4432         return rc;
4433 }
4434
4435 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4436  * the OST-object's attribute and generate an structure lfsck_layout_req on the
4437  * list ::llmd_req_list.
4438  *
4439  * For each request on above list, the lfsck_layout_assistant thread compares
4440  * the OST side attribute with local attribute, if inconsistent, then repair it.
4441  *
4442  * All above processing is async mode with pipeline. */
4443 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4444                                         struct lfsck_component *com,
4445                                         struct dt_object *obj)
4446 {
4447         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4448         struct ost_id                   *oi     = &info->lti_oi;
4449         struct lfsck_layout             *lo     = com->lc_file_ram;
4450         struct lfsck_layout_master_data *llmd   = com->lc_data;
4451         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4452         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4453         struct thandle                  *handle = NULL;
4454         struct lu_buf                   *buf    = &info->lti_big_buf;
4455         struct lov_mds_md_v1            *lmm    = NULL;
4456         struct dt_device                *dev    = lfsck->li_bottom;
4457         struct lustre_handle             lh     = { 0 };
4458         ssize_t                          buflen = buf->lb_len;
4459         int                              rc     = 0;
4460         bool                             locked = false;
4461         bool                             stripe = false;
4462         ENTRY;
4463
4464         if (!S_ISREG(lfsck_object_type(obj)))
4465                 GOTO(out, rc = 0);
4466
4467         if (llmd->llmd_assistant_status < 0)
4468                 GOTO(out, rc = -ESRCH);
4469
4470         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4471         lmm_oi_cpu_to_le(oi, oi);
4472         dt_read_lock(env, obj, 0);
4473         locked = true;
4474
4475 again:
4476         rc = lfsck_layout_get_lovea(env, obj, buf, &buflen);
4477         if (rc <= 0)
4478                 GOTO(out, rc);
4479
4480         buf->lb_len = rc;
4481         lmm = buf->lb_buf;
4482         rc = lfsck_layout_verify_header(lmm);
4483         if (rc != 0)
4484                 GOTO(out, rc);
4485
4486         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4487                 GOTO(out, stripe = true);
4488
4489         /* Inconsistent lmm_oi, should be repaired. */
4490         CDEBUG(D_LFSCK, "Repair bad lmm_oi for "DFID"\n",
4491                PFID(lfsck_dto2fid(obj)));
4492
4493         if (bk->lb_param & LPF_DRYRUN) {
4494                 down_write(&com->lc_sem);
4495                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4496                 up_write(&com->lc_sem);
4497
4498                 GOTO(out, stripe = true);
4499         }
4500
4501         if (!lustre_handle_is_used(&lh)) {
4502                 dt_read_unlock(env, obj);
4503                 locked = false;
4504                 buf->lb_len = buflen;
4505                 rc = lfsck_layout_lock(env, com, obj, &lh,
4506                                        MDS_INODELOCK_LAYOUT |
4507                                        MDS_INODELOCK_XATTR);
4508                 if (rc != 0)
4509                         GOTO(out, rc);
4510
4511                 handle = dt_trans_create(env, dev);
4512                 if (IS_ERR(handle))
4513                         GOTO(out, rc = PTR_ERR(handle));
4514
4515                 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
4516                                           LU_XATTR_REPLACE, handle);
4517                 if (rc != 0)
4518                         GOTO(out, rc);
4519
4520                 rc = dt_trans_start_local(env, dev, handle);
4521                 if (rc != 0)
4522                         GOTO(out, rc);
4523
4524                 dt_write_lock(env, obj, 0);
4525                 locked = true;
4526
4527                 goto again;
4528         }
4529
4530         lmm->lmm_oi = *oi;
4531         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LOV,
4532                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4533         if (rc != 0)
4534                 GOTO(out, rc);
4535
4536         down_write(&com->lc_sem);
4537         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4538         up_write(&com->lc_sem);
4539
4540         GOTO(out, stripe = true);
4541
4542 out:
4543         if (locked) {
4544                 if (lustre_handle_is_used(&lh))
4545                         dt_write_unlock(env, obj);
4546                 else
4547                         dt_read_unlock(env, obj);
4548         }
4549
4550         if (handle != NULL && !IS_ERR(handle))
4551                 dt_trans_stop(env, dev, handle);
4552
4553         lfsck_layout_unlock(&lh);
4554         if (stripe) {
4555                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4556         } else {
4557                 down_write(&com->lc_sem);
4558                 com->lc_new_checked++;
4559                 if (rc < 0)
4560                         lo->ll_objs_failed_phase1++;
4561                 up_write(&com->lc_sem);
4562         }
4563         buf->lb_len = buflen;
4564
4565         return rc;
4566 }
4567
4568 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4569                                        struct lfsck_component *com,
4570                                        struct dt_object *obj)
4571 {
4572         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4573         struct lfsck_layout             *lo     = com->lc_file_ram;
4574         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
4575         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4576         struct lfsck_layout_seq         *lls;
4577         __u64                            seq;
4578         __u64                            oid;
4579         int                              rc;
4580         ENTRY;
4581
4582         LASSERT(llsd != NULL);
4583
4584         lfsck_rbtree_update_bitmap(env, com, fid, false);
4585
4586         down_write(&com->lc_sem);
4587         if (fid_is_idif(fid))
4588                 seq = 0;
4589         else if (!fid_is_norm(fid) ||
4590                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4591                 GOTO(unlock, rc = 0);
4592         else
4593                 seq = fid_seq(fid);
4594         com->lc_new_checked++;
4595
4596         lls = lfsck_layout_seq_lookup(llsd, seq);
4597         if (lls == NULL) {
4598                 OBD_ALLOC_PTR(lls);
4599                 if (unlikely(lls == NULL))
4600                         GOTO(unlock, rc = -ENOMEM);
4601
4602                 INIT_LIST_HEAD(&lls->lls_list);
4603                 lls->lls_seq = seq;
4604                 rc = lfsck_layout_lastid_load(env, com, lls);
4605                 if (rc != 0) {
4606                         lo->ll_objs_failed_phase1++;
4607                         OBD_FREE_PTR(lls);
4608                         GOTO(unlock, rc);
4609                 }
4610
4611                 lfsck_layout_seq_insert(llsd, lls);
4612         }
4613
4614         if (unlikely(fid_is_last_id(fid)))
4615                 GOTO(unlock, rc = 0);
4616
4617         oid = fid_oid(fid);
4618         if (oid > lls->lls_lastid_known)
4619                 lls->lls_lastid_known = oid;
4620
4621         if (oid > lls->lls_lastid) {
4622                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4623                         /* OFD may create new objects during LFSCK scanning. */
4624                         rc = lfsck_layout_lastid_reload(env, com, lls);
4625                         if (unlikely(rc != 0))
4626                                 CWARN("%s: failed to reload LAST_ID for "LPX64
4627                                       ": rc = %d\n",
4628                                       lfsck_lfsck2name(com->lc_lfsck),
4629                                       lls->lls_seq, rc);
4630                         if (oid <= lls->lls_lastid)
4631                                 GOTO(unlock, rc = 0);
4632
4633                         LASSERT(lfsck->li_out_notify != NULL);
4634
4635                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4636                                              LE_LASTID_REBUILDING);
4637                         lo->ll_flags |= LF_CRASHED_LASTID;
4638                 }
4639
4640                 lls->lls_lastid = oid;
4641                 lls->lls_dirty = 1;
4642         }
4643
4644         GOTO(unlock, rc = 0);
4645
4646 unlock:
4647         up_write(&com->lc_sem);
4648
4649         return rc;
4650 }
4651
4652 static int lfsck_layout_exec_dir(const struct lu_env *env,
4653                                  struct lfsck_component *com,
4654                                  struct dt_object *obj,
4655                                  struct lu_dirent *ent)
4656 {
4657         return 0;
4658 }
4659
4660 static int lfsck_layout_master_post(const struct lu_env *env,
4661                                     struct lfsck_component *com,
4662                                     int result, bool init)
4663 {
4664         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4665         struct lfsck_layout             *lo      = com->lc_file_ram;
4666         struct lfsck_layout_master_data *llmd    = com->lc_data;
4667         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4668         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4669         struct l_wait_info               lwi     = { 0 };
4670         int                              rc;
4671         ENTRY;
4672
4673
4674         llmd->llmd_post_result = result;
4675         llmd->llmd_to_post = 1;
4676         if (llmd->llmd_post_result <= 0)
4677                 llmd->llmd_exit = 1;
4678
4679         wake_up_all(&athread->t_ctl_waitq);
4680         l_wait_event(mthread->t_ctl_waitq,
4681                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
4682                      thread_is_stopped(athread),
4683                      &lwi);
4684
4685         if (llmd->llmd_assistant_status < 0)
4686                 result = llmd->llmd_assistant_status;
4687
4688         down_write(&com->lc_sem);
4689         spin_lock(&lfsck->li_lock);
4690         /* When LFSCK failed, there may be some prefetched objects those are
4691          * not been processed yet, we do not know the exactly position, then
4692          * just restart from last check-point next time. */
4693         if (!init && !llmd->llmd_exit)
4694                 lo->ll_pos_last_checkpoint =
4695                                         lfsck->li_pos_current.lp_oit_cookie;
4696
4697         if (result > 0) {
4698                 lo->ll_status = LS_SCANNING_PHASE2;
4699                 lo->ll_flags |= LF_SCANNED_ONCE;
4700                 lo->ll_flags &= ~LF_UPGRADE;
4701                 list_del_init(&com->lc_link);
4702                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
4703         } else if (result == 0) {
4704                 lo->ll_status = lfsck->li_status;
4705                 if (lo->ll_status == 0)
4706                         lo->ll_status = LS_STOPPED;
4707                 if (lo->ll_status != LS_PAUSED) {
4708                         list_del_init(&com->lc_link);
4709                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4710                 }
4711         } else {
4712                 lo->ll_status = LS_FAILED;
4713                 list_del_init(&com->lc_link);
4714                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4715         }
4716         spin_unlock(&lfsck->li_lock);
4717
4718         if (!init) {
4719                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4720                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4721                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4722                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4723                 com->lc_new_checked = 0;
4724         }
4725
4726         rc = lfsck_layout_store(env, com);
4727         up_write(&com->lc_sem);
4728
4729         RETURN(rc);
4730 }
4731
4732 static int lfsck_layout_slave_post(const struct lu_env *env,
4733                                    struct lfsck_component *com,
4734                                    int result, bool init)
4735 {
4736         struct lfsck_instance   *lfsck = com->lc_lfsck;
4737         struct lfsck_layout     *lo    = com->lc_file_ram;
4738         int                      rc;
4739         bool                     done  = false;
4740
4741         rc = lfsck_layout_lastid_store(env, com);
4742         if (rc != 0)
4743                 result = rc;
4744
4745         LASSERT(lfsck->li_out_notify != NULL);
4746
4747         down_write(&com->lc_sem);
4748
4749         spin_lock(&lfsck->li_lock);
4750         if (!init)
4751                 lo->ll_pos_last_checkpoint =
4752                                         lfsck->li_pos_current.lp_oit_cookie;
4753         if (result > 0) {
4754                 lo->ll_status = LS_SCANNING_PHASE2;
4755                 lo->ll_flags |= LF_SCANNED_ONCE;
4756                 if (lo->ll_flags & LF_CRASHED_LASTID) {
4757                         done = true;
4758                         lo->ll_flags &= ~LF_CRASHED_LASTID;
4759                 }
4760                 lo->ll_flags &= ~LF_UPGRADE;
4761                 list_del_init(&com->lc_link);
4762                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
4763         } else if (result == 0) {
4764                 lo->ll_status = lfsck->li_status;
4765                 if (lo->ll_status == 0)
4766                         lo->ll_status = LS_STOPPED;
4767                 if (lo->ll_status != LS_PAUSED) {
4768                         list_del_init(&com->lc_link);
4769                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4770                 }
4771         } else {
4772                 lo->ll_status = LS_FAILED;
4773                 list_del_init(&com->lc_link);
4774                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4775         }
4776         spin_unlock(&lfsck->li_lock);
4777
4778         if (done)
4779                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4780                                      LE_LASTID_REBUILT);
4781
4782         if (!init) {
4783                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4784                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4785                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4786                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4787                 com->lc_new_checked = 0;
4788         }
4789
4790         rc = lfsck_layout_store(env, com);
4791
4792         up_write(&com->lc_sem);
4793
4794         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
4795
4796         if (result <= 0)
4797                 lfsck_rbtree_cleanup(env, com);
4798
4799         return rc;
4800 }
4801
4802 static int lfsck_layout_dump(const struct lu_env *env,
4803                              struct lfsck_component *com, char *buf, int len)
4804 {
4805         struct lfsck_instance   *lfsck = com->lc_lfsck;
4806         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
4807         struct lfsck_layout     *lo    = com->lc_file_ram;
4808         int                      save  = len;
4809         int                      ret   = -ENOSPC;
4810         int                      rc;
4811
4812         down_read(&com->lc_sem);
4813         rc = snprintf(buf, len,
4814                       "name: lfsck_layout\n"
4815                       "magic: %#x\n"
4816                       "version: %d\n"
4817                       "status: %s\n",
4818                       lo->ll_magic,
4819                       bk->lb_version,
4820                       lfsck_status2names(lo->ll_status));
4821         if (rc <= 0)
4822                 goto out;
4823
4824         buf += rc;
4825         len -= rc;
4826         rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
4827                              "flags");
4828         if (rc < 0)
4829                 goto out;
4830
4831         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
4832                              "param");
4833         if (rc < 0)
4834                 goto out;
4835
4836         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
4837                              "time_since_last_completed");
4838         if (rc < 0)
4839                 goto out;
4840
4841         rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
4842                              "time_since_latest_start");
4843         if (rc < 0)
4844                 goto out;
4845
4846         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
4847                              "time_since_last_checkpoint");
4848         if (rc < 0)
4849                 goto out;
4850
4851         rc = snprintf(buf, len,
4852                       "latest_start_position: "LPU64"\n"
4853                       "last_checkpoint_position: "LPU64"\n"
4854                       "first_failure_position: "LPU64"\n",
4855                       lo->ll_pos_latest_start,
4856                       lo->ll_pos_last_checkpoint,
4857                       lo->ll_pos_first_inconsistent);
4858         if (rc <= 0)
4859                 goto out;
4860
4861         buf += rc;
4862         len -= rc;
4863
4864         rc = snprintf(buf, len,
4865                       "success_count: %u\n"
4866                       "repaired_dangling: "LPU64"\n"
4867                       "repaired_unmatched_pair: "LPU64"\n"
4868                       "repaired_multiple_referenced: "LPU64"\n"
4869                       "repaired_orphan: "LPU64"\n"
4870                       "repaired_inconsistent_owner: "LPU64"\n"
4871                       "repaired_others: "LPU64"\n"
4872                       "skipped: "LPU64"\n"
4873                       "failed_phase1: "LPU64"\n"
4874                       "failed_phase2: "LPU64"\n",
4875                       lo->ll_success_count,
4876                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
4877                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
4878                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
4879                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
4880                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
4881                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
4882                       lo->ll_objs_skipped,
4883                       lo->ll_objs_failed_phase1,
4884                       lo->ll_objs_failed_phase2);
4885         if (rc <= 0)
4886                 goto out;
4887
4888         buf += rc;
4889         len -= rc;
4890
4891         if (lo->ll_status == LS_SCANNING_PHASE1) {
4892                 __u64 pos;
4893                 const struct dt_it_ops *iops;
4894                 cfs_duration_t duration = cfs_time_current() -
4895                                           lfsck->li_time_last_checkpoint;
4896                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
4897                 __u64 speed = checked;
4898                 __u64 new_checked = com->lc_new_checked * HZ;
4899                 __u32 rtime = lo->ll_run_time_phase1 +
4900                               cfs_duration_sec(duration + HALF_SEC);
4901
4902                 if (duration != 0)
4903                         do_div(new_checked, duration);
4904                 if (rtime != 0)
4905                         do_div(speed, rtime);
4906                 rc = snprintf(buf, len,
4907                               "checked_phase1: "LPU64"\n"
4908                               "checked_phase2: "LPU64"\n"
4909                               "run_time_phase1: %u seconds\n"
4910                               "run_time_phase2: %u seconds\n"
4911                               "average_speed_phase1: "LPU64" items/sec\n"
4912                               "average_speed_phase2: N/A\n"
4913                               "real-time_speed_phase1: "LPU64" items/sec\n"
4914                               "real-time_speed_phase2: N/A\n",
4915                               checked,
4916                               lo->ll_objs_checked_phase2,
4917                               rtime,
4918                               lo->ll_run_time_phase2,
4919                               speed,
4920                               new_checked);
4921                 if (rc <= 0)
4922                         goto out;
4923
4924                 buf += rc;
4925                 len -= rc;
4926
4927                 LASSERT(lfsck->li_di_oit != NULL);
4928
4929                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4930
4931                 /* The low layer otable-based iteration position may NOT
4932                  * exactly match the layout-based directory traversal
4933                  * cookie. Generally, it is not a serious issue. But the
4934                  * caller should NOT make assumption on that. */
4935                 pos = iops->store(env, lfsck->li_di_oit);
4936                 if (!lfsck->li_current_oit_processed)
4937                         pos--;
4938                 rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
4939                 if (rc <= 0)
4940                         goto out;
4941
4942                 buf += rc;
4943                 len -= rc;
4944         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
4945                 cfs_duration_t duration = cfs_time_current() -
4946                                           lfsck->li_time_last_checkpoint;
4947                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
4948                 __u64 speed = checked;
4949                 __u64 new_checked = com->lc_new_checked * HZ;
4950                 __u32 rtime = lo->ll_run_time_phase1 +
4951                               cfs_duration_sec(duration + HALF_SEC);
4952
4953                 if (duration != 0)
4954                         do_div(new_checked, duration);
4955                 if (rtime != 0)
4956                         do_div(speed, rtime);
4957                 rc = snprintf(buf, len,
4958                               "checked_phase1: "LPU64"\n"
4959                               "checked_phase2: "LPU64"\n"
4960                               "run_time_phase1: %u seconds\n"
4961                               "run_time_phase2: %u seconds\n"
4962                               "average_speed_phase1: "LPU64" items/sec\n"
4963                               "average_speed_phase2: N/A\n"
4964                               "real-time_speed_phase1: "LPU64" items/sec\n"
4965                               "real-time_speed_phase2: N/A\n"
4966                               "current_position: "DFID"\n",
4967                               checked,
4968                               lo->ll_objs_checked_phase2,
4969                               rtime,
4970                               lo->ll_run_time_phase2,
4971                               speed,
4972                               new_checked,
4973                               PFID(&com->lc_fid_latest_scanned_phase2));
4974                 if (rc <= 0)
4975                         goto out;
4976
4977                 buf += rc;
4978                 len -= rc;
4979         } else {
4980                 __u64 speed1 = lo->ll_objs_checked_phase1;
4981                 __u64 speed2 = lo->ll_objs_checked_phase2;
4982
4983                 if (lo->ll_run_time_phase1 != 0)
4984                         do_div(speed1, lo->ll_run_time_phase1);
4985                 if (lo->ll_run_time_phase2 != 0)
4986                         do_div(speed2, lo->ll_run_time_phase2);
4987                 rc = snprintf(buf, len,
4988                               "checked_phase1: "LPU64"\n"
4989                               "checked_phase2: "LPU64"\n"
4990                               "run_time_phase1: %u seconds\n"
4991                               "run_time_phase2: %u seconds\n"
4992                               "average_speed_phase1: "LPU64" items/sec\n"
4993                               "average_speed_phase2: "LPU64" objs/sec\n"
4994                               "real-time_speed_phase1: N/A\n"
4995                               "real-time_speed_phase2: N/A\n"
4996                               "current_position: N/A\n",
4997                               lo->ll_objs_checked_phase1,
4998                               lo->ll_objs_checked_phase2,
4999                               lo->ll_run_time_phase1,
5000                               lo->ll_run_time_phase2,
5001                               speed1,
5002                               speed2);
5003                 if (rc <= 0)
5004                         goto out;
5005
5006                 buf += rc;
5007                 len -= rc;
5008         }
5009         ret = save - len;
5010
5011 out:
5012         up_read(&com->lc_sem);
5013
5014         return ret;
5015 }
5016
5017 static int lfsck_layout_master_double_scan(const struct lu_env *env,
5018                                            struct lfsck_component *com)
5019 {
5020         struct lfsck_layout_master_data *llmd    = com->lc_data;
5021         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5022         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5023         struct lfsck_layout             *lo      = com->lc_file_ram;
5024         struct l_wait_info               lwi     = { 0 };
5025
5026         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
5027                 return 0;
5028
5029         llmd->llmd_to_double_scan = 1;
5030         wake_up_all(&athread->t_ctl_waitq);
5031         l_wait_event(mthread->t_ctl_waitq,
5032                      llmd->llmd_in_double_scan ||
5033                      thread_is_stopped(athread),
5034                      &lwi);
5035         if (llmd->llmd_assistant_status < 0)
5036                 return llmd->llmd_assistant_status;
5037
5038         return 0;
5039 }
5040
5041 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
5042                                           struct lfsck_component *com)
5043 {
5044         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5045         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5046         struct lfsck_layout             *lo     = com->lc_file_ram;
5047         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5048         int                              rc;
5049         ENTRY;
5050
5051         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
5052                 lfsck_rbtree_cleanup(env, com);
5053                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
5054                 RETURN(0);
5055         }
5056
5057         atomic_inc(&lfsck->li_double_scan_count);
5058
5059         com->lc_new_checked = 0;
5060         com->lc_new_scanned = 0;
5061         com->lc_time_last_checkpoint = cfs_time_current();
5062         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
5063                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5064
5065         while (1) {
5066                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
5067                                                      NULL, NULL);
5068
5069                 rc = lfsck_layout_slave_query_master(env, com);
5070                 if (list_empty(&llsd->llsd_master_list)) {
5071                         if (unlikely(!thread_is_running(thread)))
5072                                 rc = 0;
5073                         else
5074                                 rc = 1;
5075
5076                         GOTO(done, rc);
5077                 }
5078
5079                 if (rc < 0)
5080                         GOTO(done, rc);
5081
5082                 rc = l_wait_event(thread->t_ctl_waitq,
5083                                   !thread_is_running(thread) ||
5084                                   list_empty(&llsd->llsd_master_list),
5085                                   &lwi);
5086                 if (unlikely(!thread_is_running(thread)))
5087                         GOTO(done, rc = 0);
5088
5089                 if (rc == -ETIMEDOUT)
5090                         continue;
5091
5092                 GOTO(done, rc = (rc < 0 ? rc : 1));
5093         }
5094
5095 done:
5096         rc = lfsck_layout_double_scan_result(env, com, rc);
5097
5098         lfsck_rbtree_cleanup(env, com);
5099         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
5100         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5101                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5102
5103         return rc;
5104 }
5105
5106 static void lfsck_layout_master_data_release(const struct lu_env *env,
5107                                              struct lfsck_component *com)
5108 {
5109         struct lfsck_layout_master_data *llmd   = com->lc_data;
5110         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5111         struct lfsck_tgt_descs          *ltds;
5112         struct lfsck_tgt_desc           *ltd;
5113         struct lfsck_tgt_desc           *next;
5114
5115         LASSERT(llmd != NULL);
5116         LASSERT(thread_is_init(&llmd->llmd_thread) ||
5117                 thread_is_stopped(&llmd->llmd_thread));
5118         LASSERT(list_empty(&llmd->llmd_req_list));
5119
5120         com->lc_data = NULL;
5121
5122         ltds = &lfsck->li_ost_descs;
5123         spin_lock(&ltds->ltd_lock);
5124         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
5125                                  ltd_layout_phase_list) {
5126                 list_del_init(&ltd->ltd_layout_phase_list);
5127         }
5128         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
5129                                  ltd_layout_phase_list) {
5130                 list_del_init(&ltd->ltd_layout_phase_list);
5131         }
5132         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
5133                                  ltd_layout_list) {
5134                 list_del_init(&ltd->ltd_layout_list);
5135         }
5136         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
5137                                  ltd_layout_phase_list) {
5138                 list_del_init(&ltd->ltd_layout_phase_list);
5139         }
5140         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
5141                                  ltd_layout_phase_list) {
5142                 list_del_init(&ltd->ltd_layout_phase_list);
5143         }
5144         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
5145                                  ltd_layout_list) {
5146                 list_del_init(&ltd->ltd_layout_list);
5147         }
5148         spin_unlock(&ltds->ltd_lock);
5149
5150         OBD_FREE_PTR(llmd);
5151 }
5152
5153 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5154                                             struct lfsck_component *com)
5155 {
5156         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5157         struct lfsck_layout_seq          *lls;
5158         struct lfsck_layout_seq          *next;
5159         struct lfsck_layout_slave_target *llst;
5160         struct lfsck_layout_slave_target *tmp;
5161
5162         LASSERT(llsd != NULL);
5163
5164         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5165                                      lls_list) {
5166                 list_del_init(&lls->lls_list);
5167                 lfsck_object_put(env, lls->lls_lastid_obj);
5168                 OBD_FREE_PTR(lls);
5169         }
5170
5171         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
5172                                  llst_list) {
5173                 list_del_init(&llst->llst_list);
5174                 OBD_FREE_PTR(llst);
5175         }
5176
5177         lfsck_rbtree_cleanup(env, com);
5178         com->lc_data = NULL;
5179         OBD_FREE_PTR(llsd);
5180 }
5181
5182 static void lfsck_layout_master_quit(const struct lu_env *env,
5183                                      struct lfsck_component *com)
5184 {
5185         struct lfsck_layout_master_data *llmd    = com->lc_data;
5186         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5187         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5188         struct l_wait_info               lwi     = { 0 };
5189
5190         llmd->llmd_exit = 1;
5191         wake_up_all(&athread->t_ctl_waitq);
5192         l_wait_event(mthread->t_ctl_waitq,
5193                      thread_is_init(athread) ||
5194                      thread_is_stopped(athread),
5195                      &lwi);
5196 }
5197
5198 static void lfsck_layout_slave_quit(const struct lu_env *env,
5199                                     struct lfsck_component *com)
5200 {
5201         lfsck_rbtree_cleanup(env, com);
5202 }
5203
5204 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5205                                          struct lfsck_component *com,
5206                                          struct lfsck_request *lr)
5207 {
5208         struct lfsck_instance           *lfsck = com->lc_lfsck;
5209         struct lfsck_layout             *lo    = com->lc_file_ram;
5210         struct lfsck_layout_master_data *llmd  = com->lc_data;
5211         struct lfsck_tgt_descs          *ltds;
5212         struct lfsck_tgt_desc           *ltd;
5213         bool                             fail  = false;
5214         ENTRY;
5215
5216         if (lr->lr_event == LE_PAIRS_VERIFY) {
5217                 int rc;
5218
5219                 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5220                                                      &lr->lr_fid2);
5221
5222                 RETURN(rc);
5223         }
5224
5225         if (lr->lr_event != LE_PHASE1_DONE &&
5226             lr->lr_event != LE_PHASE2_DONE &&
5227             lr->lr_event != LE_PEER_EXIT)
5228                 RETURN(-EINVAL);
5229
5230         if (lr->lr_flags & LEF_FROM_OST)
5231                 ltds = &lfsck->li_ost_descs;
5232         else
5233                 ltds = &lfsck->li_mdt_descs;
5234         spin_lock(&ltds->ltd_lock);
5235         ltd = LTD_TGT(ltds, lr->lr_index);
5236         if (ltd == NULL) {
5237                 spin_unlock(&ltds->ltd_lock);
5238
5239                 RETURN(-ENODEV);
5240         }
5241
5242         list_del_init(&ltd->ltd_layout_phase_list);
5243         switch (lr->lr_event) {
5244         case LE_PHASE1_DONE:
5245                 if (lr->lr_status <= 0) {
5246                         ltd->ltd_layout_done = 1;
5247                         list_del_init(&ltd->ltd_layout_list);
5248                         CWARN("%s: %s %x failed/stopped at phase1: rc = %d.\n",
5249                               lfsck_lfsck2name(lfsck),
5250                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5251                               ltd->ltd_index, lr->lr_status);
5252                         lo->ll_flags |= LF_INCOMPLETE;
5253                         fail = true;
5254                         break;
5255                 }
5256
5257                 if (lr->lr_flags & LEF_FROM_OST) {
5258                         if (list_empty(&ltd->ltd_layout_list))
5259                                 list_add_tail(&ltd->ltd_layout_list,
5260                                               &llmd->llmd_ost_list);
5261                         list_add_tail(&ltd->ltd_layout_phase_list,
5262                                       &llmd->llmd_ost_phase2_list);
5263                 } else {
5264                         if (list_empty(&ltd->ltd_layout_list))
5265                                 list_add_tail(&ltd->ltd_layout_list,
5266                                               &llmd->llmd_mdt_list);
5267                         list_add_tail(&ltd->ltd_layout_phase_list,
5268                                       &llmd->llmd_mdt_phase2_list);
5269                 }
5270                 break;
5271         case LE_PHASE2_DONE:
5272                 ltd->ltd_layout_done = 1;
5273                 list_del_init(&ltd->ltd_layout_list);
5274                 break;
5275         case LE_PEER_EXIT:
5276                 fail = true;
5277                 ltd->ltd_layout_done = 1;
5278                 list_del_init(&ltd->ltd_layout_list);
5279                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
5280                         CWARN("%s: the peer %s %x exit layout LFSCK.\n",
5281                               lfsck_lfsck2name(lfsck),
5282                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5283                               ltd->ltd_index);
5284                         lo->ll_flags |= LF_INCOMPLETE;
5285                 }
5286                 break;
5287         default:
5288                 break;
5289         }
5290         spin_unlock(&ltds->ltd_lock);
5291
5292         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5293                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5294
5295                 memset(stop, 0, sizeof(*stop));
5296                 stop->ls_status = lr->lr_status;
5297                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5298                 lfsck_stop(env, lfsck->li_bottom, stop);
5299         } else if (lfsck_layout_master_to_orphan(llmd)) {
5300                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
5301         }
5302
5303         RETURN(0);
5304 }
5305
5306 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5307                                         struct lfsck_component *com,
5308                                         struct lfsck_request *lr)
5309 {
5310         struct lfsck_instance            *lfsck = com->lc_lfsck;
5311         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5312         struct lfsck_layout_slave_target *llst;
5313         int                               rc;
5314         ENTRY;
5315
5316         switch (lr->lr_event) {
5317         case LE_FID_ACCESSED:
5318                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5319                 RETURN(0);
5320         case LE_CONDITIONAL_DESTROY:
5321                 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5322                 RETURN(rc);
5323         case LE_PAIRS_VERIFY: {
5324                 lr->lr_status = LPVS_INIT;
5325                 /* Firstly, if the MDT-object which is claimed via OST-object
5326                  * local stored PFID xattr recognizes the OST-object, then it
5327                  * must be that the client given PFID is wrong. */
5328                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5329                                                     &lr->lr_fid3);
5330                 if (rc <= 0)
5331                         RETURN(0);
5332
5333                 lr->lr_status = LPVS_INCONSISTENT;
5334                 /* The OST-object local stored PFID xattr is stale. We need to
5335                  * check whether the MDT-object that is claimed via the client
5336                  * given PFID information recognizes the OST-object or not. If
5337                  * matches, then need to update the OST-object's PFID xattr. */
5338                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5339                                                     &lr->lr_fid2);
5340                 /* For rc < 0 case:
5341                  * We are not sure whether the client given PFID information
5342                  * is correct or not, do nothing to avoid improper fixing.
5343                  *
5344                  * For rc > 0 case:
5345                  * The client given PFID information is also invalid, we can
5346                  * NOT fix the OST-object inconsistency.
5347                  */
5348                 if (rc != 0)
5349                         RETURN(rc);
5350
5351                 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5352                 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5353
5354                 RETURN(rc);
5355         }
5356         case LE_PHASE2_DONE:
5357         case LE_PEER_EXIT:
5358                 break;
5359         default:
5360                 RETURN(-EINVAL);
5361         }
5362
5363         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5364         if (llst == NULL)
5365                 RETURN(-ENODEV);
5366
5367         lfsck_layout_llst_put(llst);
5368         if (list_empty(&llsd->llsd_master_list))
5369                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5370
5371         if (lr->lr_event == LE_PEER_EXIT &&
5372             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5373                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5374
5375                 memset(stop, 0, sizeof(*stop));
5376                 stop->ls_status = lr->lr_status;
5377                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5378                 lfsck_stop(env, lfsck->li_bottom, stop);
5379         }
5380
5381         RETURN(0);
5382 }
5383
5384 static int lfsck_layout_query(const struct lu_env *env,
5385                               struct lfsck_component *com)
5386 {
5387         struct lfsck_layout *lo = com->lc_file_ram;
5388
5389         return lo->ll_status;
5390 }
5391
5392 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
5393                                            struct lfsck_component *com,
5394                                            struct lfsck_tgt_descs *ltds,
5395                                            struct lfsck_tgt_desc *ltd,
5396                                            struct ptlrpc_request_set *set)
5397 {
5398         struct lfsck_thread_info          *info  = lfsck_env_info(env);
5399         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
5400         struct lfsck_request              *lr    = &info->lti_lr;
5401         struct lfsck_instance             *lfsck = com->lc_lfsck;
5402         int                                rc;
5403
5404         spin_lock(&ltds->ltd_lock);
5405         if (list_empty(&ltd->ltd_layout_list)) {
5406                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
5407                 spin_unlock(&ltds->ltd_lock);
5408
5409                 return 0;
5410         }
5411
5412         list_del_init(&ltd->ltd_layout_phase_list);
5413         list_del_init(&ltd->ltd_layout_list);
5414         spin_unlock(&ltds->ltd_lock);
5415
5416         memset(lr, 0, sizeof(*lr));
5417         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
5418         lr->lr_event = LE_PEER_EXIT;
5419         lr->lr_active = LT_LAYOUT;
5420         lr->lr_status = LS_CO_PAUSED;
5421         if (ltds == &lfsck->li_ost_descs)
5422                 lr->lr_flags = LEF_TO_OST;
5423
5424         laia->laia_com = com;
5425         laia->laia_ltds = ltds;
5426         atomic_inc(&ltd->ltd_ref);
5427         laia->laia_ltd = ltd;
5428         laia->laia_lr = lr;
5429         laia->laia_shared = 0;
5430
5431         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
5432                                  lfsck_layout_master_async_interpret,
5433                                  laia, LFSCK_NOTIFY);
5434         if (rc != 0) {
5435                 CERROR("%s: Fail to notify %s %x for co-stop: rc = %d\n",
5436                        lfsck_lfsck2name(lfsck),
5437                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5438                        ltd->ltd_index, rc);
5439                 lfsck_tgt_put(ltd);
5440         }
5441
5442         return rc;
5443 }
5444
5445 /* with lfsck::li_lock held */
5446 static int lfsck_layout_slave_join(const struct lu_env *env,
5447                                    struct lfsck_component *com,
5448                                    struct lfsck_start_param *lsp)
5449 {
5450         struct lfsck_instance            *lfsck = com->lc_lfsck;
5451         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5452         struct lfsck_layout_slave_target *llst;
5453         struct lfsck_start               *start = lsp->lsp_start;
5454         int                               rc    = 0;
5455         ENTRY;
5456
5457         if (!lsp->lsp_index_valid || start == NULL ||
5458             !(start->ls_flags & LPF_ALL_TGT) ||
5459             !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT))
5460                 RETURN(-EALREADY);
5461
5462         spin_unlock(&lfsck->li_lock);
5463         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5464         spin_lock(&lfsck->li_lock);
5465         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5466                 spin_unlock(&lfsck->li_lock);
5467                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5468                                                       true);
5469                 if (llst != NULL)
5470                         lfsck_layout_llst_put(llst);
5471                 spin_lock(&lfsck->li_lock);
5472                 rc = -EAGAIN;
5473         }
5474
5475         RETURN(rc);
5476 }
5477
5478 static struct lfsck_operations lfsck_layout_master_ops = {
5479         .lfsck_reset            = lfsck_layout_reset,
5480         .lfsck_fail             = lfsck_layout_fail,
5481         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
5482         .lfsck_prep             = lfsck_layout_master_prep,
5483         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
5484         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5485         .lfsck_post             = lfsck_layout_master_post,
5486         .lfsck_interpret        = lfsck_layout_master_async_interpret,
5487         .lfsck_dump             = lfsck_layout_dump,
5488         .lfsck_double_scan      = lfsck_layout_master_double_scan,
5489         .lfsck_data_release     = lfsck_layout_master_data_release,
5490         .lfsck_quit             = lfsck_layout_master_quit,
5491         .lfsck_in_notify        = lfsck_layout_master_in_notify,
5492         .lfsck_query            = lfsck_layout_query,
5493         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
5494 };
5495
5496 static struct lfsck_operations lfsck_layout_slave_ops = {
5497         .lfsck_reset            = lfsck_layout_reset,
5498         .lfsck_fail             = lfsck_layout_fail,
5499         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
5500         .lfsck_prep             = lfsck_layout_slave_prep,
5501         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
5502         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5503         .lfsck_post             = lfsck_layout_slave_post,
5504         .lfsck_dump             = lfsck_layout_dump,
5505         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
5506         .lfsck_data_release     = lfsck_layout_slave_data_release,
5507         .lfsck_quit             = lfsck_layout_slave_quit,
5508         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
5509         .lfsck_query            = lfsck_layout_query,
5510         .lfsck_join             = lfsck_layout_slave_join,
5511 };
5512
5513 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5514 {
5515         struct lfsck_component  *com;
5516         struct lfsck_layout     *lo;
5517         struct dt_object        *root = NULL;
5518         struct dt_object        *obj;
5519         int                      rc;
5520         ENTRY;
5521
5522         OBD_ALLOC_PTR(com);
5523         if (com == NULL)
5524                 RETURN(-ENOMEM);
5525
5526         INIT_LIST_HEAD(&com->lc_link);
5527         INIT_LIST_HEAD(&com->lc_link_dir);
5528         init_rwsem(&com->lc_sem);
5529         atomic_set(&com->lc_ref, 1);
5530         com->lc_lfsck = lfsck;
5531         com->lc_type = LT_LAYOUT;
5532         if (lfsck->li_master) {
5533                 struct lfsck_layout_master_data *llmd;
5534
5535                 com->lc_ops = &lfsck_layout_master_ops;
5536                 OBD_ALLOC_PTR(llmd);
5537                 if (llmd == NULL)
5538                         GOTO(out, rc = -ENOMEM);
5539
5540                 INIT_LIST_HEAD(&llmd->llmd_req_list);
5541                 spin_lock_init(&llmd->llmd_lock);
5542                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
5543                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
5544                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
5545                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
5546                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
5547                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
5548                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
5549                 com->lc_data = llmd;
5550         } else {
5551                 struct lfsck_layout_slave_data *llsd;
5552
5553                 com->lc_ops = &lfsck_layout_slave_ops;
5554                 OBD_ALLOC_PTR(llsd);
5555                 if (llsd == NULL)
5556                         GOTO(out, rc = -ENOMEM);
5557
5558                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5559                 INIT_LIST_HEAD(&llsd->llsd_master_list);
5560                 spin_lock_init(&llsd->llsd_lock);
5561                 llsd->llsd_rb_root = RB_ROOT;
5562                 rwlock_init(&llsd->llsd_rb_lock);
5563                 com->lc_data = llsd;
5564         }
5565         com->lc_file_size = sizeof(*lo);
5566         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5567         if (com->lc_file_ram == NULL)
5568                 GOTO(out, rc = -ENOMEM);
5569
5570         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5571         if (com->lc_file_disk == NULL)
5572                 GOTO(out, rc = -ENOMEM);
5573
5574         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5575         if (IS_ERR(root))
5576                 GOTO(out, rc = PTR_ERR(root));
5577
5578         if (unlikely(!dt_try_as_dir(env, root)))
5579                 GOTO(out, rc = -ENOTDIR);
5580
5581         obj = local_file_find_or_create(env, lfsck->li_los, root,
5582                                         lfsck_layout_name,
5583                                         S_IFREG | S_IRUGO | S_IWUSR);
5584         if (IS_ERR(obj))
5585                 GOTO(out, rc = PTR_ERR(obj));
5586
5587         com->lc_obj = obj;
5588         rc = lfsck_layout_load(env, com);
5589         if (rc > 0)
5590                 rc = lfsck_layout_reset(env, com, true);
5591         else if (rc == -ENOENT)
5592                 rc = lfsck_layout_init(env, com);
5593
5594         if (rc != 0)
5595                 GOTO(out, rc);
5596
5597         lo = com->lc_file_ram;
5598         switch (lo->ll_status) {
5599         case LS_INIT:
5600         case LS_COMPLETED:
5601         case LS_FAILED:
5602         case LS_STOPPED:
5603         case LS_PARTIAL:
5604                 spin_lock(&lfsck->li_lock);
5605                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5606                 spin_unlock(&lfsck->li_lock);
5607                 break;
5608         default:
5609                 CERROR("%s: unknown lfsck_layout status: rc = %u\n",
5610                        lfsck_lfsck2name(lfsck), lo->ll_status);
5611                 /* fall through */
5612         case LS_SCANNING_PHASE1:
5613         case LS_SCANNING_PHASE2:
5614                 /* No need to store the status to disk right now.
5615                  * If the system crashed before the status stored,
5616                  * it will be loaded back when next time. */
5617                 lo->ll_status = LS_CRASHED;
5618                 lo->ll_flags |= LF_INCOMPLETE;
5619                 /* fall through */
5620         case LS_PAUSED:
5621         case LS_CRASHED:
5622         case LS_CO_FAILED:
5623         case LS_CO_STOPPED:
5624         case LS_CO_PAUSED:
5625                 spin_lock(&lfsck->li_lock);
5626                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5627                 spin_unlock(&lfsck->li_lock);
5628                 break;
5629         }
5630
5631         if (lo->ll_flags & LF_CRASHED_LASTID) {
5632                 LASSERT(lfsck->li_out_notify != NULL);
5633
5634                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5635                                      LE_LASTID_REBUILDING);
5636         }
5637
5638         GOTO(out, rc = 0);
5639
5640 out:
5641         if (root != NULL && !IS_ERR(root))
5642                 lu_object_put(env, &root->do_lu);
5643
5644         if (rc != 0)
5645                 lfsck_component_cleanup(env, com);
5646
5647         return rc;
5648 }
5649
5650 struct lfsck_orphan_it {
5651         struct lfsck_component           *loi_com;
5652         struct lfsck_rbtree_node         *loi_lrn;
5653         struct lfsck_layout_slave_target *loi_llst;
5654         struct lu_fid                     loi_key;
5655         struct lu_orphan_rec              loi_rec;
5656         __u64                             loi_hash;
5657         unsigned int                      loi_over:1;
5658 };
5659
5660 static int lfsck_fid_match_idx(const struct lu_env *env,
5661                                struct lfsck_instance *lfsck,
5662                                const struct lu_fid *fid, int idx)
5663 {
5664         struct seq_server_site  *ss;
5665         struct lu_server_fld    *sf;
5666         struct lu_seq_range      range  = { 0 };
5667         int                      rc;
5668
5669         /* All abnormal cases will be returned to MDT0. */
5670         if (!fid_is_norm(fid)) {
5671                 if (idx == 0)
5672                         return 1;
5673
5674                 return 0;
5675         }
5676
5677         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
5678         if (unlikely(ss == NULL))
5679                 return -ENOTCONN;
5680
5681         sf = ss->ss_server_fld;
5682         LASSERT(sf != NULL);
5683
5684         fld_range_set_any(&range);
5685         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
5686         if (rc != 0)
5687                 return rc;
5688
5689         if (!fld_range_is_mdt(&range))
5690                 return -EINVAL;
5691
5692         if (range.lsr_index == idx)
5693                 return 1;
5694
5695         return 0;
5696 }
5697
5698 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
5699                                         struct dt_device *dev,
5700                                         struct dt_object *obj)
5701 {
5702         struct thandle *handle;
5703         int             rc;
5704         ENTRY;
5705
5706         handle = dt_trans_create(env, dev);
5707         if (IS_ERR(handle))
5708                 RETURN_EXIT;
5709
5710         rc = dt_declare_ref_del(env, obj, handle);
5711         if (rc != 0)
5712                 GOTO(stop, rc);
5713
5714         rc = dt_declare_destroy(env, obj, handle);
5715         if (rc != 0)
5716                 GOTO(stop, rc);
5717
5718         rc = dt_trans_start_local(env, dev, handle);
5719         if (rc != 0)
5720                 GOTO(stop, rc);
5721
5722         dt_write_lock(env, obj, 0);
5723         rc = dt_ref_del(env, obj, handle);
5724         if (rc == 0)
5725                 rc = dt_destroy(env, obj, handle);
5726         dt_write_unlock(env, obj);
5727
5728         GOTO(stop, rc);
5729
5730 stop:
5731         dt_trans_stop(env, dev, handle);
5732
5733         RETURN_EXIT;
5734 }
5735
5736 static int lfsck_orphan_index_lookup(const struct lu_env *env,
5737                                      struct dt_object *dt,
5738                                      struct dt_rec *rec,
5739                                      const struct dt_key *key,
5740                                      struct lustre_capa *capa)
5741 {
5742         return -EOPNOTSUPP;
5743 }
5744
5745 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
5746                                              struct dt_object *dt,
5747                                              const struct dt_rec *rec,
5748                                              const struct dt_key *key,
5749                                              struct thandle *handle)
5750 {
5751         return -EOPNOTSUPP;
5752 }
5753
5754 static int lfsck_orphan_index_insert(const struct lu_env *env,
5755                                      struct dt_object *dt,
5756                                      const struct dt_rec *rec,
5757                                      const struct dt_key *key,
5758                                      struct thandle *handle,
5759                                      struct lustre_capa *capa,
5760                                      int ignore_quota)
5761 {
5762         return -EOPNOTSUPP;
5763 }
5764
5765 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
5766                                              struct dt_object *dt,
5767                                              const struct dt_key *key,
5768                                              struct thandle *handle)
5769 {
5770         return -EOPNOTSUPP;
5771 }
5772
5773 static int lfsck_orphan_index_delete(const struct lu_env *env,
5774                                      struct dt_object *dt,
5775                                      const struct dt_key *key,
5776                                      struct thandle *handle,
5777                                      struct lustre_capa *capa)
5778 {
5779         return -EOPNOTSUPP;
5780 }
5781
5782 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
5783                                           struct dt_object *dt,
5784                                           __u32 attr,
5785                                           struct lustre_capa *capa)
5786 {
5787         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
5788         struct lfsck_instance           *lfsck;
5789         struct lfsck_component          *com    = NULL;
5790         struct lfsck_layout_slave_data  *llsd;
5791         struct lfsck_orphan_it          *it     = NULL;
5792         int                              rc     = 0;
5793         ENTRY;
5794
5795         lfsck = lfsck_instance_find(dev, true, false);
5796         if (unlikely(lfsck == NULL))
5797                 RETURN(ERR_PTR(-ENODEV));
5798
5799         com = lfsck_component_find(lfsck, LT_LAYOUT);
5800         if (unlikely(com == NULL))
5801                 GOTO(out, rc = -ENOENT);
5802
5803         llsd = com->lc_data;
5804         if (!llsd->llsd_rbtree_valid)
5805                 GOTO(out, rc = -ESRCH);
5806
5807         OBD_ALLOC_PTR(it);
5808         if (it == NULL)
5809                 GOTO(out, rc = -ENOMEM);
5810
5811         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
5812         if (it->loi_llst == NULL)
5813                 GOTO(out, rc = -ENODEV);
5814
5815         if (dev->dd_record_fid_accessed) {
5816                 /* The first iteration against the rbtree, scan the whole rbtree
5817                  * to remove the nodes which do NOT need to be handled. */
5818                 write_lock(&llsd->llsd_rb_lock);
5819                 if (dev->dd_record_fid_accessed) {
5820                         struct rb_node                  *node;
5821                         struct rb_node                  *next;
5822                         struct lfsck_rbtree_node        *lrn;
5823
5824                         /* No need to record the fid accessing anymore. */
5825                         dev->dd_record_fid_accessed = 0;
5826
5827                         node = rb_first(&llsd->llsd_rb_root);
5828                         while (node != NULL) {
5829                                 next = rb_next(node);
5830                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
5831                                                lrn_node);
5832                                 if (atomic_read(&lrn->lrn_known_count) <=
5833                                     atomic_read(&lrn->lrn_accessed_count)) {
5834                                         rb_erase(node, &llsd->llsd_rb_root);
5835                                         lfsck_rbtree_free(lrn);
5836                                 }
5837                                 node = next;
5838                         }
5839                 }
5840                 write_unlock(&llsd->llsd_rb_lock);
5841         }
5842
5843         /* read lock the rbtree when init, and unlock when fini */
5844         read_lock(&llsd->llsd_rb_lock);
5845         it->loi_com = com;
5846         com = NULL;
5847
5848         GOTO(out, rc = 0);
5849
5850 out:
5851         if (com != NULL)
5852                 lfsck_component_put(env, com);
5853         lfsck_instance_put(env, lfsck);
5854         if (rc != 0) {
5855                 if (it != NULL)
5856                         OBD_FREE_PTR(it);
5857
5858                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
5859         }
5860
5861         return (struct dt_it *)it;
5862 }
5863
5864 static void lfsck_orphan_it_fini(const struct lu_env *env,
5865                                  struct dt_it *di)
5866 {
5867         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
5868         struct lfsck_component           *com   = it->loi_com;
5869         struct lfsck_layout_slave_data   *llsd;
5870         struct lfsck_layout_slave_target *llst;
5871
5872         if (com != NULL) {
5873                 llsd = com->lc_data;
5874                 read_unlock(&llsd->llsd_rb_lock);
5875                 llst = it->loi_llst;
5876                 LASSERT(llst != NULL);
5877
5878                 /* Save the key and hash for iterate next. */
5879                 llst->llst_fid = it->loi_key;
5880                 llst->llst_hash = it->loi_hash;
5881                 lfsck_layout_llst_put(llst);
5882                 lfsck_component_put(env, com);
5883         }
5884         OBD_FREE_PTR(it);
5885 }
5886
5887 /**
5888  * \retval       +1: the iteration finished
5889  * \retval        0: on success, not finished
5890  * \retval      -ve: on error
5891  */
5892 static int lfsck_orphan_it_next(const struct lu_env *env,
5893                                 struct dt_it *di)
5894 {
5895         struct lfsck_thread_info        *info   = lfsck_env_info(env);
5896         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
5897         struct lu_attr                  *la     = &info->lti_la;
5898         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
5899         struct lu_fid                   *key    = &it->loi_key;
5900         struct lu_orphan_rec            *rec    = &it->loi_rec;
5901         struct lfsck_component          *com    = it->loi_com;
5902         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5903         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5904         struct dt_object                *obj;
5905         struct lfsck_rbtree_node        *lrn;
5906         int                              pos;
5907         int                              rc;
5908         __u32                            save;
5909         __u32                            idx    = it->loi_llst->llst_index;
5910         bool                             exact  = false;
5911         ENTRY;
5912
5913         if (it->loi_over)
5914                 RETURN(1);
5915
5916 again0:
5917         lrn = it->loi_lrn;
5918         if (lrn == NULL) {
5919                 lrn = lfsck_rbtree_search(llsd, key, &exact);
5920                 if (lrn == NULL) {
5921                         it->loi_over = 1;
5922                         RETURN(1);
5923                 }
5924
5925                 it->loi_lrn = lrn;
5926                 if (!exact) {
5927                         key->f_seq = lrn->lrn_seq;
5928                         key->f_oid = lrn->lrn_first_oid;
5929                         key->f_ver = 0;
5930                 }
5931         } else {
5932                 key->f_oid++;
5933                 if (unlikely(key->f_oid == 0)) {
5934                         key->f_seq++;
5935                         it->loi_lrn = NULL;
5936                         goto again0;
5937                 }
5938
5939                 if (key->f_oid >=
5940                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
5941                         it->loi_lrn = NULL;
5942                         goto again0;
5943                 }
5944         }
5945
5946         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
5947                      atomic_read(&lrn->lrn_accessed_count))) {
5948                 struct rb_node *next = rb_next(&lrn->lrn_node);
5949
5950                 while (next != NULL) {
5951                         lrn = rb_entry(next, struct lfsck_rbtree_node,
5952                                        lrn_node);
5953                         if (atomic_read(&lrn->lrn_known_count) >
5954                             atomic_read(&lrn->lrn_accessed_count))
5955                                 break;
5956                         next = rb_next(next);
5957                 }
5958
5959                 if (next == NULL) {
5960                         it->loi_over = 1;
5961                         RETURN(1);
5962                 }
5963
5964                 it->loi_lrn = lrn;
5965                 key->f_seq = lrn->lrn_seq;
5966                 key->f_oid = lrn->lrn_first_oid;
5967                 key->f_ver = 0;
5968         }
5969
5970         pos = key->f_oid - lrn->lrn_first_oid;
5971
5972 again1:
5973         pos = find_next_bit(lrn->lrn_known_bitmap,
5974                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
5975         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
5976                 key->f_oid = lrn->lrn_first_oid + pos;
5977                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
5978                         key->f_seq++;
5979                         key->f_oid = 0;
5980                 }
5981                 it->loi_lrn = NULL;
5982                 goto again0;
5983         }
5984
5985         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
5986                 pos++;
5987                 goto again1;
5988         }
5989
5990         key->f_oid = lrn->lrn_first_oid + pos;
5991         obj = lfsck_object_find(env, lfsck, key);
5992         if (IS_ERR(obj)) {
5993                 rc = PTR_ERR(obj);
5994                 if (rc == -ENOENT) {
5995                         pos++;
5996                         goto again1;
5997                 }
5998                 RETURN(rc);
5999         }
6000
6001         dt_read_lock(env, obj, 0);
6002         if (!dt_object_exists(obj)) {
6003                 dt_read_unlock(env, obj);
6004                 lfsck_object_put(env, obj);
6005                 pos++;
6006                 goto again1;
6007         }
6008
6009         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
6010         if (rc != 0)
6011                 GOTO(out, rc);
6012
6013         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
6014                           XATTR_NAME_FID, BYPASS_CAPA);
6015         if (rc == -ENODATA) {
6016                 /* For the pre-created OST-object, update the bitmap to avoid
6017                  * others LFSCK (second phase) iteration to touch it again. */
6018                 if (la->la_ctime == 0) {
6019                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
6020                                 atomic_inc(&lrn->lrn_accessed_count);
6021
6022                         /* For the race between repairing dangling referenced
6023                          * MDT-object and unlink the file, it may left orphan
6024                          * OST-object there. Destroy it now! */
6025                         if (unlikely(!(la->la_mode & S_ISUID))) {
6026                                 dt_read_unlock(env, obj);
6027                                 lfsck_layout_destroy_orphan(env,
6028                                                             lfsck->li_bottom,
6029                                                             obj);
6030                                 lfsck_object_put(env, obj);
6031                                 pos++;
6032                                 goto again1;
6033                         }
6034                 } else if (idx == 0) {
6035                         /* If the orphan OST-object has no parent information,
6036                          * regard it as referenced by the MDT-object on MDT0. */
6037                         fid_zero(&rec->lor_fid);
6038                         rec->lor_uid = la->la_uid;
6039                         rec->lor_gid = la->la_gid;
6040                         GOTO(out, rc = 0);
6041                 }
6042
6043                 dt_read_unlock(env, obj);
6044                 lfsck_object_put(env, obj);
6045                 pos++;
6046                 goto again1;
6047         }
6048
6049         if (rc < 0)
6050                 GOTO(out, rc);
6051
6052         if (rc != sizeof(struct filter_fid) &&
6053             rc != sizeof(struct filter_fid_old))
6054                 GOTO(out, rc = -EINVAL);
6055
6056         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6057         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6058          * MDT-object's FID::f_ver, instead it is the OST-object index in its
6059          * parent MDT-object's layout EA. */
6060         save = rec->lor_fid.f_stripe_idx;
6061         rec->lor_fid.f_ver = 0;
6062         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6063         /* If the orphan OST-object does not claim the MDT, then next.
6064          *
6065          * If we do not know whether it matches or not, then return it
6066          * to the MDT for further check. */
6067         if (rc == 0) {
6068                 dt_read_unlock(env, obj);
6069                 lfsck_object_put(env, obj);
6070                 pos++;
6071                 goto again1;
6072         }
6073
6074         rec->lor_fid.f_stripe_idx = save;
6075         rec->lor_uid = la->la_uid;
6076         rec->lor_gid = la->la_gid;
6077
6078         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6079                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6080                rec->lor_uid, rec->lor_gid);
6081
6082         GOTO(out, rc = 0);
6083
6084 out:
6085         dt_read_unlock(env, obj);
6086         lfsck_object_put(env, obj);
6087         if (rc == 0)
6088                 it->loi_hash++;
6089
6090         return rc;
6091 }
6092
6093 /**
6094  * \retval       +1: locate to the exactly position
6095  * \retval        0: cannot locate to the exactly position,
6096  *                   call next() to move to a valid position.
6097  * \retval      -ve: on error
6098  */
6099 static int lfsck_orphan_it_get(const struct lu_env *env,
6100                                struct dt_it *di,
6101                                const struct dt_key *key)
6102 {
6103         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6104         int                      rc;
6105
6106         it->loi_key = *(struct lu_fid *)key;
6107         rc = lfsck_orphan_it_next(env, di);
6108         if (rc == 1)
6109                 return 0;
6110
6111         if (rc == 0)
6112                 return 1;
6113
6114         return rc;
6115 }
6116
6117 static void lfsck_orphan_it_put(const struct lu_env *env,
6118                                 struct dt_it *di)
6119 {
6120 }
6121
6122 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6123                                           const struct dt_it *di)
6124 {
6125         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6126
6127         return (struct dt_key *)&it->loi_key;
6128 }
6129
6130 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6131                                     const struct dt_it *di)
6132 {
6133         return sizeof(struct lu_fid);
6134 }
6135
6136 static int lfsck_orphan_it_rec(const struct lu_env *env,
6137                                const struct dt_it *di,
6138                                struct dt_rec *rec,
6139                                __u32 attr)
6140 {
6141         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6142
6143         *(struct lu_orphan_rec *)rec = it->loi_rec;
6144
6145         return 0;
6146 }
6147
6148 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6149                                    const struct dt_it *di)
6150 {
6151         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6152
6153         return it->loi_hash;
6154 }
6155
6156 /**
6157  * \retval       +1: locate to the exactly position
6158  * \retval        0: cannot locate to the exactly position,
6159  *                   call next() to move to a valid position.
6160  * \retval      -ve: on error
6161  */
6162 static int lfsck_orphan_it_load(const struct lu_env *env,
6163                                 const struct dt_it *di,
6164                                 __u64 hash)
6165 {
6166         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
6167         struct lfsck_layout_slave_target *llst = it->loi_llst;
6168         int                               rc;
6169
6170         LASSERT(llst != NULL);
6171
6172         if (hash != llst->llst_hash) {
6173                 CWARN("%s: the given hash "LPU64" for orphan iteration does "
6174                       "not match the one when fini "LPU64", to be reset.\n",
6175                       lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6176                       llst->llst_hash);
6177                 fid_zero(&llst->llst_fid);
6178                 llst->llst_hash = 0;
6179         }
6180
6181         it->loi_key = llst->llst_fid;
6182         it->loi_hash = llst->llst_hash;
6183         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6184         if (rc == 1)
6185                 return 0;
6186
6187         if (rc == 0)
6188                 return 1;
6189
6190         return rc;
6191 }
6192
6193 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6194                                    const struct dt_it *di,
6195                                    void *key_rec)
6196 {
6197         return 0;
6198 }
6199
6200 const struct dt_index_operations lfsck_orphan_index_ops = {
6201         .dio_lookup             = lfsck_orphan_index_lookup,
6202         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
6203         .dio_insert             = lfsck_orphan_index_insert,
6204         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
6205         .dio_delete             = lfsck_orphan_index_delete,
6206         .dio_it = {
6207                 .init           = lfsck_orphan_it_init,
6208                 .fini           = lfsck_orphan_it_fini,
6209                 .get            = lfsck_orphan_it_get,
6210                 .put            = lfsck_orphan_it_put,
6211                 .next           = lfsck_orphan_it_next,
6212                 .key            = lfsck_orphan_it_key,
6213                 .key_size       = lfsck_orphan_it_key_size,
6214                 .rec            = lfsck_orphan_it_rec,
6215                 .store          = lfsck_orphan_it_store,
6216                 .load           = lfsck_orphan_it_load,
6217                 .key_rec        = lfsck_orphan_it_key_rec,
6218         }
6219 };