Whamcloud - gitweb
2fe033e4756f3383230d57e6d45f997ca454e555
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_linkea.h>
43 #include <lustre_fid.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <md_object.h>
48 #include <obd_class.h>
49
50 #include "lfsck_internal.h"
51
52 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
53
54 static const char lfsck_layout_name[] = "lfsck_layout";
55
56 struct lfsck_layout_seq {
57         struct list_head         lls_list;
58         __u64                    lls_seq;
59         __u64                    lls_lastid;
60         __u64                    lls_lastid_known;
61         struct dt_object        *lls_lastid_obj;
62         unsigned int             lls_dirty:1;
63 };
64
65 struct lfsck_layout_slave_target {
66         /* link into lfsck_layout_slave_data::llsd_master_list. */
67         struct list_head        llst_list;
68         /* The position for next record in the rbtree for iteration. */
69         struct lu_fid           llst_fid;
70         /* Dummy hash for iteration against the rbtree. */
71         __u64                   llst_hash;
72         __u64                   llst_gen;
73         atomic_t                llst_ref;
74         __u32                   llst_index;
75 };
76
77 struct lfsck_layout_slave_data {
78         /* list for lfsck_layout_seq */
79         struct list_head         llsd_seq_list;
80
81         /* list for the masters involve layout verification. */
82         struct list_head         llsd_master_list;
83         spinlock_t               llsd_lock;
84         __u64                    llsd_touch_gen;
85         struct dt_object        *llsd_rb_obj;
86         struct rb_root           llsd_rb_root;
87         rwlock_t                 llsd_rb_lock;
88         unsigned int             llsd_rbtree_valid:1;
89 };
90
91 struct lfsck_layout_object {
92         struct dt_object        *llo_obj;
93         struct lu_attr           llo_attr;
94         atomic_t                 llo_ref;
95         __u16                    llo_gen;
96 };
97
98 struct lfsck_layout_req {
99         struct list_head                 llr_list;
100         struct lfsck_layout_object      *llr_parent;
101         struct dt_object                *llr_child;
102         __u32                            llr_ost_idx;
103         __u32                            llr_lov_idx; /* offset in LOV EA */
104 };
105
106 struct lfsck_layout_master_data {
107         spinlock_t              llmd_lock;
108         struct list_head        llmd_req_list;
109
110         /* list for the ost targets involve layout verification. */
111         struct list_head        llmd_ost_list;
112
113         /* list for the ost targets in phase1 scanning. */
114         struct list_head        llmd_ost_phase1_list;
115
116         /* list for the ost targets in phase1 scanning. */
117         struct list_head        llmd_ost_phase2_list;
118
119         /* list for the mdt targets involve layout verification. */
120         struct list_head        llmd_mdt_list;
121
122         /* list for the mdt targets in phase1 scanning. */
123         struct list_head        llmd_mdt_phase1_list;
124
125         /* list for the mdt targets in phase1 scanning. */
126         struct list_head        llmd_mdt_phase2_list;
127
128         struct ptlrpc_thread    llmd_thread;
129         __u32                   llmd_touch_gen;
130         int                     llmd_prefetched;
131         int                     llmd_assistant_status;
132         int                     llmd_post_result;
133         unsigned int            llmd_to_post:1,
134                                 llmd_to_double_scan:1,
135                                 llmd_in_double_scan:1,
136                                 llmd_exit:1;
137 };
138
139 struct lfsck_layout_slave_async_args {
140         struct obd_export                *llsaa_exp;
141         struct lfsck_component           *llsaa_com;
142         struct lfsck_layout_slave_target *llsaa_llst;
143 };
144
145 static struct lfsck_layout_object *
146 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
147                          __u16 gen)
148 {
149         struct lfsck_layout_object *llo;
150         int                         rc;
151
152         OBD_ALLOC_PTR(llo);
153         if (llo == NULL)
154                 return ERR_PTR(-ENOMEM);
155
156         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
157         if (rc != 0) {
158                 OBD_FREE_PTR(llo);
159
160                 return ERR_PTR(rc);
161         }
162
163         lu_object_get(&obj->do_lu);
164         llo->llo_obj = obj;
165         /* The gen can be used to check whether some others have changed the
166          * file layout after LFSCK pre-fetching but before real verification. */
167         llo->llo_gen = gen;
168         atomic_set(&llo->llo_ref, 1);
169
170         return llo;
171 }
172
173 static inline void
174 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
175 {
176         if (atomic_dec_and_test(&llst->llst_ref)) {
177                 LASSERT(list_empty(&llst->llst_list));
178
179                 OBD_FREE_PTR(llst);
180         }
181 }
182
183 static inline int
184 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
185 {
186         struct lfsck_layout_slave_target *llst;
187         struct lfsck_layout_slave_target *tmp;
188         int                               rc   = 0;
189
190         OBD_ALLOC_PTR(llst);
191         if (llst == NULL)
192                 return -ENOMEM;
193
194         INIT_LIST_HEAD(&llst->llst_list);
195         llst->llst_gen = 0;
196         llst->llst_index = index;
197         atomic_set(&llst->llst_ref, 1);
198
199         spin_lock(&llsd->llsd_lock);
200         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
201                 if (tmp->llst_index == index) {
202                         rc = -EALREADY;
203                         break;
204                 }
205         }
206         if (rc == 0)
207                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
208         spin_unlock(&llsd->llsd_lock);
209
210         if (rc != 0)
211                 OBD_FREE_PTR(llst);
212
213         return rc;
214 }
215
216 static inline void
217 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
218                       struct lfsck_layout_slave_target *llst)
219 {
220         bool del = false;
221
222         spin_lock(&llsd->llsd_lock);
223         if (!list_empty(&llst->llst_list)) {
224                 list_del_init(&llst->llst_list);
225                 del = true;
226         }
227         spin_unlock(&llsd->llsd_lock);
228
229         if (del)
230                 lfsck_layout_llst_put(llst);
231 }
232
233 static inline struct lfsck_layout_slave_target *
234 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
235                                __u32 index, bool unlink)
236 {
237         struct lfsck_layout_slave_target *llst;
238
239         spin_lock(&llsd->llsd_lock);
240         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
241                 if (llst->llst_index == index) {
242                         if (unlink)
243                                 list_del_init(&llst->llst_list);
244                         else
245                                 atomic_inc(&llst->llst_ref);
246                         spin_unlock(&llsd->llsd_lock);
247
248                         return llst;
249                 }
250         }
251         spin_unlock(&llsd->llsd_lock);
252
253         return NULL;
254 }
255
256 static inline void lfsck_layout_object_put(const struct lu_env *env,
257                                            struct lfsck_layout_object *llo)
258 {
259         if (atomic_dec_and_test(&llo->llo_ref)) {
260                 lfsck_object_put(env, llo->llo_obj);
261                 OBD_FREE_PTR(llo);
262         }
263 }
264
265 static struct lfsck_layout_req *
266 lfsck_layout_req_init(struct lfsck_layout_object *parent,
267                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
268 {
269         struct lfsck_layout_req *llr;
270
271         OBD_ALLOC_PTR(llr);
272         if (llr == NULL)
273                 return ERR_PTR(-ENOMEM);
274
275         INIT_LIST_HEAD(&llr->llr_list);
276         atomic_inc(&parent->llo_ref);
277         llr->llr_parent = parent;
278         llr->llr_child = child;
279         llr->llr_ost_idx = ost_idx;
280         llr->llr_lov_idx = lov_idx;
281
282         return llr;
283 }
284
285 static inline void lfsck_layout_req_fini(const struct lu_env *env,
286                                          struct lfsck_layout_req *llr)
287 {
288         lu_object_put(env, &llr->llr_child->do_lu);
289         lfsck_layout_object_put(env, llr->llr_parent);
290         OBD_FREE_PTR(llr);
291 }
292
293 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
294 {
295         bool empty = false;
296
297         spin_lock(&llmd->llmd_lock);
298         if (list_empty(&llmd->llmd_req_list))
299                 empty = true;
300         spin_unlock(&llmd->llmd_lock);
301
302         return empty;
303 }
304
305 static int lfsck_layout_get_lovea(const struct lu_env *env,
306                                   struct dt_object *obj,
307                                   struct lu_buf *buf, ssize_t *buflen)
308 {
309         int rc;
310
311 again:
312         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
313         if (rc == -ERANGE) {
314                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
315                                   BYPASS_CAPA);
316                 if (rc <= 0)
317                         return rc;
318
319                 lu_buf_realloc(buf, rc);
320                 if (buflen != NULL)
321                         *buflen = buf->lb_len;
322
323                 if (buf->lb_buf == NULL)
324                         return -ENOMEM;
325
326                 goto again;
327         }
328
329         if (rc == -ENODATA)
330                 rc = 0;
331
332         if (rc <= 0)
333                 return rc;
334
335         if (unlikely(buf->lb_buf == NULL)) {
336                 lu_buf_alloc(buf, rc);
337                 if (buflen != NULL)
338                         *buflen = buf->lb_len;
339
340                 if (buf->lb_buf == NULL)
341                         return -ENOMEM;
342
343                 goto again;
344         }
345
346         return rc;
347 }
348
349 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
350 {
351         __u32 magic;
352         __u32 patten;
353
354         magic = le32_to_cpu(lmm->lmm_magic);
355         /* If magic crashed, keep it there. Sometime later, during OST-object
356          * orphan handling, if some OST-object(s) back-point to it, it can be
357          * verified and repaired. */
358         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
359                 return -EINVAL;
360
361         patten = le32_to_cpu(lmm->lmm_pattern);
362         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
363         if (patten != LOV_PATTERN_RAID0)
364                 return -EOPNOTSUPP;
365
366         return 0;
367 }
368
369 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
370 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
371 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_SIZE - 1)
372
373 struct lfsck_rbtree_node {
374         struct rb_node   lrn_node;
375         __u64            lrn_seq;
376         __u32            lrn_first_oid;
377         atomic_t         lrn_known_count;
378         atomic_t         lrn_accessed_count;
379         void            *lrn_known_bitmap;
380         void            *lrn_accessed_bitmap;
381 };
382
383 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
384                                    __u64 seq, __u32 oid)
385 {
386         if (seq < lrn->lrn_seq)
387                 return -1;
388
389         if (seq > lrn->lrn_seq)
390                 return 1;
391
392         if (oid < lrn->lrn_first_oid)
393                 return -1;
394
395         if (oid >= lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH)
396                 return 1;
397
398         return 0;
399 }
400
401 /* The caller should hold llsd->llsd_rb_lock. */
402 static struct lfsck_rbtree_node *
403 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
404                     const struct lu_fid *fid, bool *exact)
405 {
406         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
407         struct rb_node           *prev  = NULL;
408         struct lfsck_rbtree_node *lrn   = NULL;
409         int                       rc    = 0;
410
411         if (exact != NULL)
412                 *exact = true;
413
414         while (node != NULL) {
415                 prev = node;
416                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
417                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
418                 if (rc < 0)
419                         node = node->rb_left;
420                 else if (rc > 0)
421                         node = node->rb_right;
422                 else
423                         return lrn;
424         }
425
426         if (exact == NULL)
427                 return NULL;
428
429         /* If there is no exactly matched one, then to the next valid one. */
430         *exact = false;
431
432         /* The rbtree is empty. */
433         if (rc == 0)
434                 return NULL;
435
436         if (rc < 0)
437                 return lrn;
438
439         node = rb_next(prev);
440
441         /* The end of the rbtree. */
442         if (node == NULL)
443                 return NULL;
444
445         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
446
447         return lrn;
448 }
449
450 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
451                                                   const struct lu_fid *fid)
452 {
453         struct lfsck_rbtree_node *lrn;
454
455         OBD_ALLOC_PTR(lrn);
456         if (lrn == NULL)
457                 return ERR_PTR(-ENOMEM);
458
459         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
460         if (lrn->lrn_known_bitmap == NULL) {
461                 OBD_FREE_PTR(lrn);
462
463                 return ERR_PTR(-ENOMEM);
464         }
465
466         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
467         if (lrn->lrn_accessed_bitmap == NULL) {
468                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
469                 OBD_FREE_PTR(lrn);
470
471                 return ERR_PTR(-ENOMEM);
472         }
473
474         rb_init_node(&lrn->lrn_node);
475         lrn->lrn_seq = fid_seq(fid);
476         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
477         atomic_set(&lrn->lrn_known_count, 0);
478         atomic_set(&lrn->lrn_accessed_count, 0);
479
480         return lrn;
481 }
482
483 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
484 {
485         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
486         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
487         OBD_FREE_PTR(lrn);
488 }
489
490 /* The caller should hold lock. */
491 static struct lfsck_rbtree_node *
492 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
493                     struct lfsck_rbtree_node *lrn)
494 {
495         struct rb_node           **pos    = &(llsd->llsd_rb_root.rb_node);
496         struct rb_node            *parent = NULL;
497         struct lfsck_rbtree_node  *tmp;
498         int                        rc;
499
500         while (*pos) {
501                 parent = *pos;
502                 tmp = rb_entry(*pos, struct lfsck_rbtree_node, lrn_node);
503                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
504                 if (rc < 0)
505                         pos = &((*pos)->rb_left);
506                 else if (rc > 0)
507                         pos = &((*pos)->rb_right);
508                 else
509                         return tmp;
510         }
511
512         rb_link_node(&lrn->lrn_node, parent, pos);
513         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
514
515         return lrn;
516 }
517
518 extern const struct dt_index_operations lfsck_orphan_index_ops;
519
520 static int lfsck_rbtree_setup(const struct lu_env *env,
521                               struct lfsck_component *com)
522 {
523         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
524         struct lfsck_instance           *lfsck  = com->lc_lfsck;
525         struct dt_device                *dev    = lfsck->li_bottom;
526         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
527         struct dt_object                *obj;
528
529         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
530         fid->f_oid = lfsck_dev_idx(dev);
531         fid->f_ver = 0;
532         obj = dt_locate(env, dev, fid);
533         if (IS_ERR(obj))
534                 RETURN(PTR_ERR(obj));
535
536         /* Generate an in-RAM object to stand for the layout rbtree.
537          * Scanning the layout rbtree will be via the iteration over
538          * the object. In the future, the rbtree may be written onto
539          * disk with the object.
540          *
541          * Mark the object to be as exist. */
542         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
543         obj->do_index_ops = &lfsck_orphan_index_ops;
544         llsd->llsd_rb_obj = obj;
545         llsd->llsd_rbtree_valid = 1;
546         dev->dd_record_fid_accessed = 1;
547
548         return 0;
549 }
550
551 static void lfsck_rbtree_cleanup(const struct lu_env *env,
552                                  struct lfsck_component *com)
553 {
554         struct lfsck_instance           *lfsck = com->lc_lfsck;
555         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
556         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
557         struct rb_node                  *next;
558         struct lfsck_rbtree_node        *lrn;
559
560         lfsck->li_bottom->dd_record_fid_accessed = 0;
561         /* Invalid the rbtree, then no others will use it. */
562         write_lock(&llsd->llsd_rb_lock);
563         llsd->llsd_rbtree_valid = 0;
564         write_unlock(&llsd->llsd_rb_lock);
565
566         while (node != NULL) {
567                 next = rb_next(node);
568                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
569                 rb_erase(node, &llsd->llsd_rb_root);
570                 lfsck_rbtree_free(lrn);
571                 node = next;
572         }
573
574         if (llsd->llsd_rb_obj != NULL) {
575                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
576                 llsd->llsd_rb_obj = NULL;
577         }
578 }
579
580 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
581                                        struct lfsck_component *com,
582                                        const struct lu_fid *fid,
583                                        bool accessed)
584 {
585         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
586         struct lfsck_rbtree_node        *lrn;
587         bool                             insert = false;
588         int                              idx;
589         int                              rc     = 0;
590         ENTRY;
591
592         CDEBUG(D_LFSCK, "%s: update bitmap for "DFID"\n",
593                lfsck_lfsck2name(com->lc_lfsck), PFID(fid));
594
595         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
596                 RETURN_EXIT;
597
598         if (!fid_is_idif(fid) && !fid_is_norm(fid))
599                 RETURN_EXIT;
600
601         read_lock(&llsd->llsd_rb_lock);
602         if (!llsd->llsd_rbtree_valid)
603                 GOTO(unlock, rc = 0);
604
605         lrn = lfsck_rbtree_search(llsd, fid, NULL);
606         if (lrn == NULL) {
607                 struct lfsck_rbtree_node *tmp;
608
609                 LASSERT(!insert);
610
611                 read_unlock(&llsd->llsd_rb_lock);
612                 tmp = lfsck_rbtree_new(env, fid);
613                 if (IS_ERR(tmp))
614                         GOTO(out, rc = PTR_ERR(tmp));
615
616                 insert = true;
617                 write_lock(&llsd->llsd_rb_lock);
618                 if (!llsd->llsd_rbtree_valid) {
619                         lfsck_rbtree_free(tmp);
620                         GOTO(unlock, rc = 0);
621                 }
622
623                 lrn = lfsck_rbtree_insert(llsd, tmp);
624                 if (lrn != tmp)
625                         lfsck_rbtree_free(tmp);
626         }
627
628         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
629         /* Any accessed object must be a known object. */
630         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
631                 atomic_inc(&lrn->lrn_known_count);
632         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
633                 atomic_inc(&lrn->lrn_accessed_count);
634
635         GOTO(unlock, rc = 0);
636
637 unlock:
638         if (insert)
639                 write_unlock(&llsd->llsd_rb_lock);
640         else
641                 read_unlock(&llsd->llsd_rb_lock);
642 out:
643         if (rc != 0 && accessed) {
644                 struct lfsck_layout *lo = com->lc_file_ram;
645
646                 CERROR("%s: Fail to update object accessed bitmap, will cause "
647                        "incorrect LFSCK OST-object handling, so disable it to "
648                        "cancel orphan handling for related device. rc = %d.\n",
649                        lfsck_lfsck2name(com->lc_lfsck), rc);
650                 lo->ll_flags |= LF_INCOMPLETE;
651                 lfsck_rbtree_cleanup(env, com);
652         }
653 }
654
655 static inline bool is_dummy_lov_ost_data(struct lov_ost_data_v1 *obj)
656 {
657         if (fid_is_zero(&obj->l_ost_oi.oi_fid) &&
658             obj->l_ost_gen == 0 && obj->l_ost_idx == 0)
659                 return true;
660
661         return false;
662 }
663
664 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
665                                    const struct lfsck_layout *src)
666 {
667         int i;
668
669         des->ll_magic = le32_to_cpu(src->ll_magic);
670         des->ll_status = le32_to_cpu(src->ll_status);
671         des->ll_flags = le32_to_cpu(src->ll_flags);
672         des->ll_success_count = le32_to_cpu(src->ll_success_count);
673         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
674         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
675         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
676         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
677         des->ll_time_last_checkpoint =
678                                 le64_to_cpu(src->ll_time_last_checkpoint);
679         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
680         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
681         des->ll_pos_first_inconsistent =
682                         le64_to_cpu(src->ll_pos_first_inconsistent);
683         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
684         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
685         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
686         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
687         for (i = 0; i < LLIT_MAX; i++)
688                 des->ll_objs_repaired[i] =
689                                 le64_to_cpu(src->ll_objs_repaired[i]);
690         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
691 }
692
693 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
694                                    const struct lfsck_layout *src)
695 {
696         int i;
697
698         des->ll_magic = cpu_to_le32(src->ll_magic);
699         des->ll_status = cpu_to_le32(src->ll_status);
700         des->ll_flags = cpu_to_le32(src->ll_flags);
701         des->ll_success_count = cpu_to_le32(src->ll_success_count);
702         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
703         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
704         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
705         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
706         des->ll_time_last_checkpoint =
707                                 cpu_to_le64(src->ll_time_last_checkpoint);
708         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
709         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
710         des->ll_pos_first_inconsistent =
711                         cpu_to_le64(src->ll_pos_first_inconsistent);
712         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
713         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
714         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
715         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
716         for (i = 0; i < LLIT_MAX; i++)
717                 des->ll_objs_repaired[i] =
718                                 cpu_to_le64(src->ll_objs_repaired[i]);
719         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
720 }
721
722 /**
723  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
724  * \retval 0: succeed.
725  * \retval -ve: failed cases.
726  */
727 static int lfsck_layout_load(const struct lu_env *env,
728                              struct lfsck_component *com)
729 {
730         struct lfsck_layout             *lo     = com->lc_file_ram;
731         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
732         ssize_t                          size   = com->lc_file_size;
733         loff_t                           pos    = 0;
734         int                              rc;
735
736         rc = dbo->dbo_read(env, com->lc_obj,
737                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
738                            BYPASS_CAPA);
739         if (rc == 0) {
740                 return -ENOENT;
741         } else if (rc < 0) {
742                 CWARN("%s: failed to load lfsck_layout: rc = %d\n",
743                       lfsck_lfsck2name(com->lc_lfsck), rc);
744                 return rc;
745         } else if (rc != size) {
746                 CWARN("%s: crashed lfsck_layout, to be reset: rc = %d\n",
747                       lfsck_lfsck2name(com->lc_lfsck), rc);
748                 return 1;
749         }
750
751         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
752         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
753                 CWARN("%s: invalid lfsck_layout magic %#x != %#x, "
754                       "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
755                       lo->ll_magic, LFSCK_LAYOUT_MAGIC);
756                 return 1;
757         }
758
759         return 0;
760 }
761
762 static int lfsck_layout_store(const struct lu_env *env,
763                               struct lfsck_component *com)
764 {
765         struct dt_object         *obj           = com->lc_obj;
766         struct lfsck_instance    *lfsck         = com->lc_lfsck;
767         struct lfsck_layout      *lo            = com->lc_file_disk;
768         struct thandle           *handle;
769         ssize_t                   size          = com->lc_file_size;
770         loff_t                    pos           = 0;
771         int                       rc;
772         ENTRY;
773
774         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
775         handle = dt_trans_create(env, lfsck->li_bottom);
776         if (IS_ERR(handle)) {
777                 rc = PTR_ERR(handle);
778                 CERROR("%s: fail to create trans for storing lfsck_layout: "
779                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
780                 RETURN(rc);
781         }
782
783         rc = dt_declare_record_write(env, obj, size, pos, handle);
784         if (rc != 0) {
785                 CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
786                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
787                 GOTO(out, rc);
788         }
789
790         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
791         if (rc != 0) {
792                 CERROR("%s: fail to start trans for storing lfsck_layout: "
793                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
794                 GOTO(out, rc);
795         }
796
797         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
798                              handle);
799         if (rc != 0)
800                 CERROR("%s: fail to store lfsck_layout(1): size = %d, "
801                        "rc = %d\n", lfsck_lfsck2name(lfsck), (int)size, rc);
802
803         GOTO(out, rc);
804
805 out:
806         dt_trans_stop(env, lfsck->li_bottom, handle);
807
808         return rc;
809 }
810
811 static int lfsck_layout_init(const struct lu_env *env,
812                              struct lfsck_component *com)
813 {
814         struct lfsck_layout *lo = com->lc_file_ram;
815         int rc;
816
817         memset(lo, 0, com->lc_file_size);
818         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
819         lo->ll_status = LS_INIT;
820         down_write(&com->lc_sem);
821         rc = lfsck_layout_store(env, com);
822         up_write(&com->lc_sem);
823
824         return rc;
825 }
826
827 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
828                              struct dt_object *obj, const struct lu_fid *fid)
829 {
830         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
831         struct lu_seq_range      range  = { 0 };
832         struct lustre_mdt_attrs *lma;
833         int                      rc;
834
835         fld_range_set_any(&range);
836         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
837         if (rc == 0) {
838                 if (fld_range_is_ost(&range))
839                         return 1;
840
841                 return 0;
842         }
843
844         lma = &lfsck_env_info(env)->lti_lma;
845         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
846                           XATTR_NAME_LMA, BYPASS_CAPA);
847         if (rc == sizeof(*lma)) {
848                 lustre_lma_swab(lma);
849
850                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
851         }
852
853         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
854
855         return rc > 0;
856 }
857
858 static struct lfsck_layout_seq *
859 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
860 {
861         struct lfsck_layout_seq *lls;
862
863         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
864                 if (lls->lls_seq == seq)
865                         return lls;
866
867                 if (lls->lls_seq > seq)
868                         return NULL;
869         }
870
871         return NULL;
872 }
873
874 static void
875 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
876                         struct lfsck_layout_seq *lls)
877 {
878         struct lfsck_layout_seq *tmp;
879         struct list_head        *pos = &llsd->llsd_seq_list;
880
881         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
882                 if (lls->lls_seq < tmp->lls_seq) {
883                         pos = &tmp->lls_list;
884                         break;
885                 }
886         }
887         list_add_tail(&lls->lls_list, pos);
888 }
889
890 static int
891 lfsck_layout_lastid_create(const struct lu_env *env,
892                            struct lfsck_instance *lfsck,
893                            struct dt_object *obj)
894 {
895         struct lfsck_thread_info *info   = lfsck_env_info(env);
896         struct lu_attr           *la     = &info->lti_la;
897         struct dt_object_format  *dof    = &info->lti_dof;
898         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
899         struct dt_device         *dt     = lfsck->li_bottom;
900         struct thandle           *th;
901         __u64                     lastid = 0;
902         loff_t                    pos    = 0;
903         int                       rc;
904         ENTRY;
905
906         CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
907                fid_seq(lfsck_dto2fid(obj)));
908
909         if (bk->lb_param & LPF_DRYRUN)
910                 return 0;
911
912         memset(la, 0, sizeof(*la));
913         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
914         la->la_valid = LA_MODE | LA_UID | LA_GID;
915         dof->dof_type = dt_mode_to_dft(S_IFREG);
916
917         th = dt_trans_create(env, dt);
918         if (IS_ERR(th))
919                 RETURN(rc = PTR_ERR(th));
920
921         rc = dt_declare_create(env, obj, la, NULL, dof, th);
922         if (rc != 0)
923                 GOTO(stop, rc);
924
925         rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
926         if (rc != 0)
927                 GOTO(stop, rc);
928
929         rc = dt_trans_start_local(env, dt, th);
930         if (rc != 0)
931                 GOTO(stop, rc);
932
933         dt_write_lock(env, obj, 0);
934         if (likely(!dt_object_exists(obj))) {
935                 rc = dt_create(env, obj, la, NULL, dof, th);
936                 if (rc == 0)
937                         rc = dt_record_write(env, obj,
938                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
939                                 &pos, th);
940         }
941         dt_write_unlock(env, obj);
942
943         GOTO(stop, rc);
944
945 stop:
946         dt_trans_stop(env, dt, th);
947
948         return rc;
949 }
950
951 static int
952 lfsck_layout_lastid_reload(const struct lu_env *env,
953                            struct lfsck_component *com,
954                            struct lfsck_layout_seq *lls)
955 {
956         __u64   lastid;
957         loff_t  pos     = 0;
958         int     rc;
959
960         dt_read_lock(env, lls->lls_lastid_obj, 0);
961         rc = dt_record_read(env, lls->lls_lastid_obj,
962                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
963         dt_read_unlock(env, lls->lls_lastid_obj);
964         if (unlikely(rc != 0))
965                 return rc;
966
967         lastid = le64_to_cpu(lastid);
968         if (lastid < lls->lls_lastid_known) {
969                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
970                 struct lfsck_layout     *lo     = com->lc_file_ram;
971
972                 lls->lls_lastid = lls->lls_lastid_known;
973                 lls->lls_dirty = 1;
974                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
975                         LASSERT(lfsck->li_out_notify != NULL);
976
977                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
978                                              LE_LASTID_REBUILDING);
979                         lo->ll_flags |= LF_CRASHED_LASTID;
980                 }
981         } else if (lastid >= lls->lls_lastid) {
982                 lls->lls_lastid = lastid;
983                 lls->lls_dirty = 0;
984         }
985
986         return 0;
987 }
988
989 static int
990 lfsck_layout_lastid_store(const struct lu_env *env,
991                           struct lfsck_component *com)
992 {
993         struct lfsck_instance           *lfsck  = com->lc_lfsck;
994         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
995         struct dt_device                *dt     = lfsck->li_bottom;
996         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
997         struct lfsck_layout_seq         *lls;
998         struct thandle                  *th;
999         __u64                            lastid;
1000         int                              rc     = 0;
1001         int                              rc1    = 0;
1002
1003         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1004                 loff_t pos = 0;
1005
1006                 /* XXX: Add the code back if we really found related
1007                  *      inconsistent cases in the future. */
1008 #if 0
1009                 if (!lls->lls_dirty) {
1010                         /* In OFD, before the pre-creation, the LAST_ID
1011                          * file will be updated firstly, which may hide
1012                          * some potential crashed cases. For example:
1013                          *
1014                          * The old obj1's ID is higher than old LAST_ID
1015                          * but lower than the new LAST_ID, but the LFSCK
1016                          * have not touch the obj1 until the OFD updated
1017                          * the LAST_ID. So the LFSCK does not regard it
1018                          * as crashed case. But when OFD does not create
1019                          * successfully, it will set the LAST_ID as the
1020                          * real created objects' ID, then LFSCK needs to
1021                          * found related inconsistency. */
1022                         rc = lfsck_layout_lastid_reload(env, com, lls);
1023                         if (likely(!lls->lls_dirty))
1024                                 continue;
1025                 }
1026 #endif
1027
1028                 CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
1029                        " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
1030
1031                 if (bk->lb_param & LPF_DRYRUN) {
1032                         lls->lls_dirty = 0;
1033                         continue;
1034                 }
1035
1036                 th = dt_trans_create(env, dt);
1037                 if (IS_ERR(th)) {
1038                         rc1 = PTR_ERR(th);
1039                         CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
1040                                lfsck_lfsck2name(com->lc_lfsck),
1041                                lls->lls_seq, rc1);
1042                         continue;
1043                 }
1044
1045                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1046                                              sizeof(lastid), pos, th);
1047                 if (rc != 0)
1048                         goto stop;
1049
1050                 rc = dt_trans_start_local(env, dt, th);
1051                 if (rc != 0)
1052                         goto stop;
1053
1054                 lastid = cpu_to_le64(lls->lls_lastid);
1055                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1056                 rc = dt_record_write(env, lls->lls_lastid_obj,
1057                                      lfsck_buf_get(env, &lastid,
1058                                      sizeof(lastid)), &pos, th);
1059                 dt_write_unlock(env, lls->lls_lastid_obj);
1060                 if (rc == 0)
1061                         lls->lls_dirty = 0;
1062
1063 stop:
1064                 dt_trans_stop(env, dt, th);
1065                 if (rc != 0) {
1066                         rc1 = rc;
1067                         CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
1068                                lfsck_lfsck2name(com->lc_lfsck),
1069                                lls->lls_seq, rc1);
1070                 }
1071         }
1072
1073         return rc1;
1074 }
1075
1076 static int
1077 lfsck_layout_lastid_load(const struct lu_env *env,
1078                          struct lfsck_component *com,
1079                          struct lfsck_layout_seq *lls)
1080 {
1081         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1082         struct lfsck_layout     *lo     = com->lc_file_ram;
1083         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1084         struct dt_object        *obj;
1085         loff_t                   pos    = 0;
1086         int                      rc;
1087         ENTRY;
1088
1089         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1090         obj = dt_locate(env, lfsck->li_bottom, fid);
1091         if (IS_ERR(obj))
1092                 RETURN(PTR_ERR(obj));
1093
1094         /* LAST_ID crashed, to be rebuilt */
1095         if (!dt_object_exists(obj)) {
1096                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1097                         LASSERT(lfsck->li_out_notify != NULL);
1098
1099                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1100                                              LE_LASTID_REBUILDING);
1101                         lo->ll_flags |= LF_CRASHED_LASTID;
1102
1103                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1104                             cfs_fail_val > 0) {
1105                                 struct l_wait_info lwi = LWI_TIMEOUT(
1106                                                 cfs_time_seconds(cfs_fail_val),
1107                                                 NULL, NULL);
1108
1109                                 up_write(&com->lc_sem);
1110                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1111                                              !thread_is_running(&lfsck->li_thread),
1112                                              &lwi);
1113                                 down_write(&com->lc_sem);
1114                         }
1115                 }
1116
1117                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1118         } else {
1119                 dt_read_lock(env, obj, 0);
1120                 rc = dt_read(env, obj,
1121                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1122                         &pos);
1123                 dt_read_unlock(env, obj);
1124                 if (rc != 0 && rc != sizeof(__u64))
1125                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1126
1127                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1128                         LASSERT(lfsck->li_out_notify != NULL);
1129
1130                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1131                                              LE_LASTID_REBUILDING);
1132                         lo->ll_flags |= LF_CRASHED_LASTID;
1133                 }
1134
1135                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1136                 rc = 0;
1137         }
1138
1139         GOTO(out, rc);
1140
1141 out:
1142         if (rc != 0)
1143                 lfsck_object_put(env, obj);
1144         else
1145                 lls->lls_lastid_obj = obj;
1146
1147         return rc;
1148 }
1149
1150 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1151                                                struct ptlrpc_request *req,
1152                                                void *args, int rc)
1153 {
1154         struct lfsck_async_interpret_args *laia = args;
1155         struct lfsck_component            *com  = laia->laia_com;
1156         struct lfsck_layout_master_data   *llmd = com->lc_data;
1157         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1158         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1159         struct lfsck_request              *lr   = laia->laia_lr;
1160
1161         switch (lr->lr_event) {
1162         case LE_START:
1163                 if (rc != 0) {
1164                         struct lfsck_layout *lo = com->lc_file_ram;
1165
1166                         CERROR("%s: fail to notify %s %x for layout start: "
1167                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1168                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1169                                ltd->ltd_index, rc);
1170                         lo->ll_flags |= LF_INCOMPLETE;
1171                         break;
1172                 }
1173
1174                 spin_lock(&ltds->ltd_lock);
1175                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1176                         spin_unlock(&ltds->ltd_lock);
1177                         break;
1178                 }
1179
1180                 if (lr->lr_flags & LEF_TO_OST) {
1181                         if (list_empty(&ltd->ltd_layout_list))
1182                                 list_add_tail(&ltd->ltd_layout_list,
1183                                               &llmd->llmd_ost_list);
1184                         if (list_empty(&ltd->ltd_layout_phase_list))
1185                                 list_add_tail(&ltd->ltd_layout_phase_list,
1186                                               &llmd->llmd_ost_phase1_list);
1187                 } else {
1188                         if (list_empty(&ltd->ltd_layout_list))
1189                                 list_add_tail(&ltd->ltd_layout_list,
1190                                               &llmd->llmd_mdt_list);
1191                         if (list_empty(&ltd->ltd_layout_phase_list))
1192                                 list_add_tail(&ltd->ltd_layout_phase_list,
1193                                               &llmd->llmd_mdt_phase1_list);
1194                 }
1195                 spin_unlock(&ltds->ltd_lock);
1196                 break;
1197         case LE_STOP:
1198         case LE_PHASE1_DONE:
1199         case LE_PHASE2_DONE:
1200         case LE_PEER_EXIT:
1201                 if (rc != 0 && rc != -EALREADY)
1202                         CWARN("%s: fail to notify %s %x for layout: "
1203                               "event = %d, rc = %d\n",
1204                               lfsck_lfsck2name(com->lc_lfsck),
1205                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1206                               ltd->ltd_index, lr->lr_event, rc);
1207                 break;
1208         case LE_QUERY: {
1209                 struct lfsck_reply *reply;
1210
1211                 if (rc != 0) {
1212                         spin_lock(&ltds->ltd_lock);
1213                         list_del_init(&ltd->ltd_layout_phase_list);
1214                         list_del_init(&ltd->ltd_layout_list);
1215                         spin_unlock(&ltds->ltd_lock);
1216                         break;
1217                 }
1218
1219                 reply = req_capsule_server_get(&req->rq_pill,
1220                                                &RMF_LFSCK_REPLY);
1221                 if (reply == NULL) {
1222                         rc = -EPROTO;
1223                         CERROR("%s: invalid return value: rc = %d\n",
1224                                lfsck_lfsck2name(com->lc_lfsck), rc);
1225                         spin_lock(&ltds->ltd_lock);
1226                         list_del_init(&ltd->ltd_layout_phase_list);
1227                         list_del_init(&ltd->ltd_layout_list);
1228                         spin_unlock(&ltds->ltd_lock);
1229                         break;
1230                 }
1231
1232                 switch (reply->lr_status) {
1233                 case LS_SCANNING_PHASE1:
1234                         break;
1235                 case LS_SCANNING_PHASE2:
1236                         spin_lock(&ltds->ltd_lock);
1237                         list_del_init(&ltd->ltd_layout_phase_list);
1238                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1239                                 spin_unlock(&ltds->ltd_lock);
1240                                 break;
1241                         }
1242
1243                         if (lr->lr_flags & LEF_TO_OST)
1244                                 list_add_tail(&ltd->ltd_layout_phase_list,
1245                                               &llmd->llmd_ost_phase2_list);
1246                         else
1247                                 list_add_tail(&ltd->ltd_layout_phase_list,
1248                                               &llmd->llmd_mdt_phase2_list);
1249                         spin_unlock(&ltds->ltd_lock);
1250                         break;
1251                 default:
1252                         spin_lock(&ltds->ltd_lock);
1253                         list_del_init(&ltd->ltd_layout_phase_list);
1254                         list_del_init(&ltd->ltd_layout_list);
1255                         spin_unlock(&ltds->ltd_lock);
1256                         break;
1257                 }
1258                 break;
1259         }
1260         default:
1261                 CERROR("%s: unexpected event: rc = %d\n",
1262                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1263                 break;
1264         }
1265
1266         if (!laia->laia_shared) {
1267                 lfsck_tgt_put(ltd);
1268                 lfsck_component_put(env, com);
1269         }
1270
1271         return 0;
1272 }
1273
1274 static int lfsck_layout_master_query_others(const struct lu_env *env,
1275                                             struct lfsck_component *com)
1276 {
1277         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1278         struct lfsck_request              *lr    = &info->lti_lr;
1279         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1280         struct lfsck_instance             *lfsck = com->lc_lfsck;
1281         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1282         struct ptlrpc_request_set         *set;
1283         struct lfsck_tgt_descs            *ltds;
1284         struct lfsck_tgt_desc             *ltd;
1285         struct list_head                  *head;
1286         int                                rc    = 0;
1287         int                                rc1   = 0;
1288         ENTRY;
1289
1290         set = ptlrpc_prep_set();
1291         if (set == NULL)
1292                 RETURN(-ENOMEM);
1293
1294         llmd->llmd_touch_gen++;
1295         memset(lr, 0, sizeof(*lr));
1296         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1297         lr->lr_event = LE_QUERY;
1298         lr->lr_active = LT_LAYOUT;
1299         laia->laia_com = com;
1300         laia->laia_lr = lr;
1301         laia->laia_shared = 0;
1302
1303         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1304                 ltds = &lfsck->li_mdt_descs;
1305                 lr->lr_flags = 0;
1306                 head = &llmd->llmd_mdt_phase1_list;
1307         } else {
1308
1309 again:
1310                 ltds = &lfsck->li_ost_descs;
1311                 lr->lr_flags = LEF_TO_OST;
1312                 head = &llmd->llmd_ost_phase1_list;
1313         }
1314
1315         laia->laia_ltds = ltds;
1316         spin_lock(&ltds->ltd_lock);
1317         while (!list_empty(head)) {
1318                 ltd = list_entry(head->next,
1319                                  struct lfsck_tgt_desc,
1320                                  ltd_layout_phase_list);
1321                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1322                         break;
1323
1324                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1325                 list_del(&ltd->ltd_layout_phase_list);
1326                 list_add_tail(&ltd->ltd_layout_phase_list, head);
1327                 atomic_inc(&ltd->ltd_ref);
1328                 laia->laia_ltd = ltd;
1329                 spin_unlock(&ltds->ltd_lock);
1330                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1331                                          lfsck_layout_master_async_interpret,
1332                                          laia, LFSCK_QUERY);
1333                 if (rc != 0) {
1334                         CERROR("%s: fail to query %s %x for layout: rc = %d\n",
1335                                lfsck_lfsck2name(lfsck),
1336                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1337                                ltd->ltd_index, rc);
1338                         lfsck_tgt_put(ltd);
1339                         rc1 = rc;
1340                 }
1341                 spin_lock(&ltds->ltd_lock);
1342         }
1343         spin_unlock(&ltds->ltd_lock);
1344
1345         rc = ptlrpc_set_wait(set);
1346         if (rc < 0) {
1347                 ptlrpc_set_destroy(set);
1348                 RETURN(rc);
1349         }
1350
1351         if (!(lr->lr_flags & LEF_TO_OST) &&
1352             list_empty(&llmd->llmd_mdt_phase1_list))
1353                 goto again;
1354
1355         ptlrpc_set_destroy(set);
1356
1357         RETURN(rc1 != 0 ? rc1 : rc);
1358 }
1359
1360 static inline bool
1361 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1362 {
1363         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1364                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1365                 list_empty(&llmd->llmd_ost_phase1_list));
1366 }
1367
1368 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1369                                              struct lfsck_component *com,
1370                                              struct lfsck_request *lr)
1371 {
1372         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1373         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1374         struct lfsck_instance             *lfsck = com->lc_lfsck;
1375         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1376         struct lfsck_layout               *lo    = com->lc_file_ram;
1377         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1378         struct ptlrpc_request_set         *set;
1379         struct lfsck_tgt_descs            *ltds;
1380         struct lfsck_tgt_desc             *ltd;
1381         struct lfsck_tgt_desc             *next;
1382         struct list_head                  *head;
1383         __u32                              idx;
1384         int                                rc    = 0;
1385         ENTRY;
1386
1387         set = ptlrpc_prep_set();
1388         if (set == NULL)
1389                 RETURN(-ENOMEM);
1390
1391         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1392         lr->lr_active = LT_LAYOUT;
1393         laia->laia_com = com;
1394         laia->laia_lr = lr;
1395         laia->laia_shared = 0;
1396         switch (lr->lr_event) {
1397         case LE_START:
1398                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1399                 ltds = &lfsck->li_ost_descs;
1400                 laia->laia_ltds = ltds;
1401                 down_read(&ltds->ltd_rw_sem);
1402                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1403                         ltd = lfsck_tgt_get(ltds, idx);
1404                         LASSERT(ltd != NULL);
1405
1406                         laia->laia_ltd = ltd;
1407                         ltd->ltd_layout_done = 0;
1408                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1409                                         lfsck_layout_master_async_interpret,
1410                                         laia, LFSCK_NOTIFY);
1411                         if (rc != 0) {
1412                                 CERROR("%s: fail to notify %s %x for layout "
1413                                        "start: rc = %d\n",
1414                                        lfsck_lfsck2name(lfsck),
1415                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1416                                        "MDT", idx, rc);
1417                                 lfsck_tgt_put(ltd);
1418                                 lo->ll_flags |= LF_INCOMPLETE;
1419                         }
1420                 }
1421                 up_read(&ltds->ltd_rw_sem);
1422
1423                 /* Sync up */
1424                 rc = ptlrpc_set_wait(set);
1425                 if (rc < 0) {
1426                         ptlrpc_set_destroy(set);
1427                         RETURN(rc);
1428                 }
1429
1430                 if (!(bk->lb_param & LPF_ALL_TGT))
1431                         break;
1432
1433                 /* link other MDT targets locallly. */
1434                 spin_lock(&ltds->ltd_lock);
1435                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1436                         ltd = LTD_TGT(ltds, idx);
1437                         LASSERT(ltd != NULL);
1438
1439                         if (!list_empty(&ltd->ltd_layout_list))
1440                                 continue;
1441
1442                         list_add_tail(&ltd->ltd_layout_list,
1443                                       &llmd->llmd_mdt_list);
1444                         list_add_tail(&ltd->ltd_layout_phase_list,
1445                                       &llmd->llmd_mdt_phase1_list);
1446                 }
1447                 spin_unlock(&ltds->ltd_lock);
1448                 break;
1449         case LE_STOP:
1450         case LE_PHASE2_DONE:
1451         case LE_PEER_EXIT: {
1452                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1453                 if (bk->lb_param & LPF_ALL_TGT) {
1454                         head = &llmd->llmd_mdt_list;
1455                         ltds = &lfsck->li_mdt_descs;
1456                         if (lr->lr_event == LE_STOP) {
1457                                 /* unlink other MDT targets locallly. */
1458                                 spin_lock(&ltds->ltd_lock);
1459                                 list_for_each_entry_safe(ltd, next, head,
1460                                                          ltd_layout_list) {
1461                                         list_del_init(&ltd->ltd_layout_phase_list);
1462                                         list_del_init(&ltd->ltd_layout_list);
1463                                 }
1464                                 spin_unlock(&ltds->ltd_lock);
1465
1466                                 lr->lr_flags |= LEF_TO_OST;
1467                                 head = &llmd->llmd_ost_list;
1468                                 ltds = &lfsck->li_ost_descs;
1469                         } else {
1470                                 lr->lr_flags &= ~LEF_TO_OST;
1471                         }
1472                 } else {
1473                         lr->lr_flags |= LEF_TO_OST;
1474                         head = &llmd->llmd_ost_list;
1475                         ltds = &lfsck->li_ost_descs;
1476                 }
1477
1478 again:
1479                 laia->laia_ltds = ltds;
1480                 spin_lock(&ltds->ltd_lock);
1481                 while (!list_empty(head)) {
1482                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1483                                          ltd_layout_list);
1484                         if (!list_empty(&ltd->ltd_layout_phase_list))
1485                                 list_del_init(&ltd->ltd_layout_phase_list);
1486                         list_del_init(&ltd->ltd_layout_list);
1487                         atomic_inc(&ltd->ltd_ref);
1488                         laia->laia_ltd = ltd;
1489                         spin_unlock(&ltds->ltd_lock);
1490                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1491                                         lfsck_layout_master_async_interpret,
1492                                         laia, LFSCK_NOTIFY);
1493                         if (rc != 0) {
1494                                 CERROR("%s: fail to notify %s %x for layout "
1495                                        "stop/phase2: rc = %d\n",
1496                                        lfsck_lfsck2name(lfsck),
1497                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1498                                        "MDT", ltd->ltd_index, rc);
1499                                 lfsck_tgt_put(ltd);
1500                         }
1501                         spin_lock(&ltds->ltd_lock);
1502                 }
1503                 spin_unlock(&ltds->ltd_lock);
1504
1505                 rc = ptlrpc_set_wait(set);
1506                 if (rc < 0) {
1507                         ptlrpc_set_destroy(set);
1508                         RETURN(rc);
1509                 }
1510
1511                 if (!(lr->lr_flags & LEF_TO_OST)) {
1512                         lr->lr_flags |= LEF_TO_OST;
1513                         head = &llmd->llmd_ost_list;
1514                         ltds = &lfsck->li_ost_descs;
1515                         goto again;
1516                 }
1517                 break;
1518         }
1519         case LE_PHASE1_DONE:
1520                 llmd->llmd_touch_gen++;
1521                 ltds = &lfsck->li_mdt_descs;
1522                 laia->laia_ltds = ltds;
1523                 spin_lock(&ltds->ltd_lock);
1524                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1525                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1526                                          struct lfsck_tgt_desc,
1527                                          ltd_layout_phase_list);
1528                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1529                                 break;
1530
1531                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1532                         list_del_init(&ltd->ltd_layout_phase_list);
1533                         list_add_tail(&ltd->ltd_layout_phase_list,
1534                                       &llmd->llmd_mdt_phase1_list);
1535                         atomic_inc(&ltd->ltd_ref);
1536                         laia->laia_ltd = ltd;
1537                         spin_unlock(&ltds->ltd_lock);
1538                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1539                                         lfsck_layout_master_async_interpret,
1540                                         laia, LFSCK_NOTIFY);
1541                         if (rc != 0) {
1542                                 CERROR("%s: fail to notify MDT %x for layout "
1543                                        "phase1 done: rc = %d\n",
1544                                        lfsck_lfsck2name(lfsck),
1545                                        ltd->ltd_index, rc);
1546                                 lfsck_tgt_put(ltd);
1547                         }
1548                         spin_lock(&ltds->ltd_lock);
1549                 }
1550                 spin_unlock(&ltds->ltd_lock);
1551                 break;
1552         default:
1553                 CERROR("%s: unexpected LFSCK event: rc = %d\n",
1554                        lfsck_lfsck2name(lfsck), lr->lr_event);
1555                 rc = -EINVAL;
1556                 break;
1557         }
1558
1559         rc = ptlrpc_set_wait(set);
1560         ptlrpc_set_destroy(set);
1561
1562         RETURN(rc);
1563 }
1564
1565 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1566                                            struct lfsck_component *com,
1567                                            int rc)
1568 {
1569         struct lfsck_instance   *lfsck = com->lc_lfsck;
1570         struct lfsck_layout     *lo    = com->lc_file_ram;
1571         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1572
1573         down_write(&com->lc_sem);
1574
1575         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1576                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1577         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1578         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1579
1580         if (rc > 0) {
1581                 com->lc_journal = 0;
1582                 if (lo->ll_flags & LF_INCOMPLETE)
1583                         lo->ll_status = LS_PARTIAL;
1584                 else
1585                         lo->ll_status = LS_COMPLETED;
1586                 if (!(bk->lb_param & LPF_DRYRUN))
1587                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1588                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1589                 lo->ll_success_count++;
1590         } else if (rc == 0) {
1591                 lo->ll_status = lfsck->li_status;
1592                 if (lo->ll_status == 0)
1593                         lo->ll_status = LS_STOPPED;
1594         } else {
1595                 lo->ll_status = LS_FAILED;
1596         }
1597
1598         if (lo->ll_status != LS_PAUSED) {
1599                 spin_lock(&lfsck->li_lock);
1600                 list_del_init(&com->lc_link);
1601                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1602                 spin_unlock(&lfsck->li_lock);
1603         }
1604
1605         rc = lfsck_layout_store(env, com);
1606
1607         up_write(&com->lc_sem);
1608
1609         return rc;
1610 }
1611
1612 static int lfsck_layout_lock(const struct lu_env *env,
1613                              struct lfsck_component *com,
1614                              struct dt_object *obj,
1615                              struct lustre_handle *lh, __u64 bits)
1616 {
1617         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1618         ldlm_policy_data_t              *policy = &info->lti_policy;
1619         struct ldlm_res_id              *resid  = &info->lti_resid;
1620         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1621         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1622         int                              rc;
1623
1624         LASSERT(lfsck->li_namespace != NULL);
1625
1626         memset(policy, 0, sizeof(*policy));
1627         policy->l_inodebits.bits = bits;
1628         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1629         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1630                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1631                                     ldlm_completion_ast, NULL, NULL, 0,
1632                                     LVB_T_NONE, NULL, lh);
1633         if (rc == ELDLM_OK) {
1634                 rc = 0;
1635         } else {
1636                 memset(lh, 0, sizeof(*lh));
1637                 rc = -EIO;
1638         }
1639
1640         return rc;
1641 }
1642
1643 static void lfsck_layout_unlock(struct lustre_handle *lh)
1644 {
1645         if (lustre_handle_is_used(lh)) {
1646                 ldlm_lock_decref(lh, LCK_EX);
1647                 memset(lh, 0, sizeof(*lh));
1648         }
1649 }
1650
1651 static int lfsck_layout_trans_stop(const struct lu_env *env,
1652                                    struct dt_device *dev,
1653                                    struct thandle *handle, int result)
1654 {
1655         int rc;
1656
1657         handle->th_result = result;
1658         rc = dt_trans_stop(env, dev, handle);
1659         if (rc > 0)
1660                 rc = 0;
1661         else if (rc == 0)
1662                 rc = 1;
1663
1664         return rc;
1665 }
1666
1667 /**
1668  * \retval       +1: repaired
1669  * \retval        0: did nothing
1670  * \retval      -ve: on error
1671  */
1672 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1673                                      struct thandle *handle,
1674                                      struct dt_object *parent,
1675                                      struct lu_fid *cfid,
1676                                      struct lu_buf *buf,
1677                                      struct lov_ost_data_v1 *slot,
1678                                      int fl, __u32 ost_idx)
1679 {
1680         struct ost_id   *oi     = &lfsck_env_info(env)->lti_oi;
1681         int              rc;
1682
1683         fid_to_ostid(cfid, oi);
1684         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1685         slot->l_ost_gen = cpu_to_le32(0);
1686         slot->l_ost_idx = cpu_to_le32(ost_idx);
1687         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle,
1688                           BYPASS_CAPA);
1689         if (rc == 0)
1690                 rc = 1;
1691
1692         return rc;
1693 }
1694
1695 /**
1696  * \retval       +1: repaired
1697  * \retval        0: did nothing
1698  * \retval      -ve: on error
1699  */
1700 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1701                                      struct thandle *handle,
1702                                      struct dt_object *parent,
1703                                      struct lu_fid *cfid,
1704                                      struct lu_buf *buf, int fl,
1705                                      __u32 ost_idx, __u32 ea_off)
1706 {
1707         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1708         struct lov_ost_data_v1  *objs;
1709         int                      rc;
1710         ENTRY;
1711
1712         if (fl == LU_XATTR_CREATE) {
1713                 LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1,
1714                                                        LOV_MAGIC_V1));
1715
1716                 memset(lmm, 0, buf->lb_len);
1717                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1718                 /* XXX: currently, we only support LOV_PATTERN_RAID0. */
1719                 lmm->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
1720                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1721                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1722                 /* XXX: We cannot know the stripe size,
1723                  *      then use the default value (1 MB). */
1724                 lmm->lmm_stripe_size = cpu_to_le32(1024 * 1024);
1725                 lmm->lmm_layout_gen = cpu_to_le16(0);
1726                 objs = &(lmm->lmm_objects[ea_off]);
1727         } else {
1728                 __u16   count = le16_to_cpu(lmm->lmm_stripe_count);
1729                 int     gap   = ea_off - count;
1730                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1731
1732                 /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3
1733                  * which has been verified in lfsck_layout_verify_header()
1734                  * already. If some new magic introduced in the future,
1735                  * then layout LFSCK needs to be updated also. */
1736                 if (magic == LOV_MAGIC_V1) {
1737                         objs = &(lmm->lmm_objects[count]);
1738                 } else {
1739                         LASSERT(magic == LOV_MAGIC_V3);
1740                         objs = &((struct lov_mds_md_v3 *)lmm)->
1741                                                         lmm_objects[count];
1742                 }
1743
1744                 if (gap > 0)
1745                         memset(objs, 0, gap * sizeof(*objs));
1746                 lmm->lmm_layout_gen =
1747                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1748                 objs += gap;
1749
1750                 LASSERT(buf->lb_len == lov_mds_md_size(ea_off + 1, magic));
1751         }
1752
1753         lmm->lmm_stripe_count = cpu_to_le16(ea_off + 1);
1754         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1755                                        fl, ost_idx);
1756
1757         RETURN(rc);
1758 }
1759
1760 /**
1761  * \retval       +1: repaired
1762  * \retval        0: did nothing
1763  * \retval      -ve: on error
1764  */
1765 static int lfsck_layout_update_pfid(const struct lu_env *env,
1766                                     struct lfsck_component *com,
1767                                     struct dt_object *parent,
1768                                     struct lu_fid *cfid,
1769                                     struct dt_device *cdev, __u32 ea_off)
1770 {
1771         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1772         struct dt_object        *child;
1773         struct thandle          *handle;
1774         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1775         struct lu_buf           *buf;
1776         int                      rc     = 0;
1777         ENTRY;
1778
1779         child = lfsck_object_find_by_dev(env, cdev, cfid);
1780         if (IS_ERR(child))
1781                 RETURN(PTR_ERR(child));
1782
1783         handle = dt_trans_create(env, cdev);
1784         if (IS_ERR(handle))
1785                 GOTO(out, rc = PTR_ERR(handle));
1786
1787         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1788         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1789         /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver,
1790          * instead, it is the OST-object index in its parent MDT-object
1791          * layout EA. */
1792         pfid->ff_parent.f_ver = cpu_to_le32(ea_off);
1793         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1794
1795         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1796         if (rc != 0)
1797                 GOTO(stop, rc);
1798
1799         rc = dt_trans_start(env, cdev, handle);
1800         if (rc != 0)
1801                 GOTO(stop, rc);
1802
1803         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1804                           BYPASS_CAPA);
1805
1806         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1807
1808 stop:
1809         dt_trans_stop(env, cdev, handle);
1810
1811 out:
1812         lu_object_put(env, &child->do_lu);
1813
1814         return rc;
1815 }
1816
1817 /**
1818  * \retval       +1: repaired
1819  * \retval        0: did nothing
1820  * \retval      -ve: on error
1821  */
1822 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1823                                         struct lfsck_component *com,
1824                                         struct lfsck_tgt_desc *ltd,
1825                                         struct lu_orphan_rec *rec,
1826                                         struct lu_fid *cfid,
1827                                         const char *prefix,
1828                                         const char *postfix,
1829                                         __u32 ea_off)
1830 {
1831         /* XXX: To be extended in other patch. */
1832         return 0;
1833 }
1834
1835 /**
1836  * \retval       +1: repaired
1837  * \retval        0: did nothing
1838  * \retval      -ve: on error
1839  */
1840 static int lfsck_layout_conflict_create(const struct lu_env *env,
1841                                         struct lfsck_component *com,
1842                                         struct lfsck_tgt_desc *ltd,
1843                                         struct lu_orphan_rec *rec,
1844                                         struct dt_object *parent,
1845                                         struct lu_fid *cfid,
1846                                         struct lu_buf *ea_buf,
1847                                         struct lov_ost_data_v1 *slot,
1848                                         __u32 ea_off, __u32 ori_len)
1849 {
1850         /* XXX: To be extended in other patch. */
1851         return 0;
1852 }
1853
1854 /**
1855  * \retval       +1: repaired
1856  * \retval        0: did nothing
1857  * \retval      -ve: on error
1858  */
1859 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
1860                                        struct lfsck_component *com,
1861                                        struct lfsck_tgt_desc *ltd,
1862                                        struct lu_orphan_rec *rec,
1863                                        struct dt_object *parent,
1864                                        struct lu_fid *cfid,
1865                                        __u32 ost_idx, __u32 ea_off)
1866 {
1867         struct lfsck_thread_info *info          = lfsck_env_info(env);
1868         struct lu_buf            *buf           = &info->lti_big_buf;
1869         struct lu_fid            *fid           = &info->lti_fid2;
1870         struct ost_id            *oi            = &info->lti_oi;
1871         struct lfsck_instance    *lfsck         = com->lc_lfsck;
1872         struct dt_device         *dt            = lfsck->li_bottom;
1873         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
1874         struct thandle            *handle       = NULL;
1875         size_t                    buflen        = buf->lb_len;
1876         struct lov_mds_md_v1     *lmm;
1877         struct lov_ost_data_v1   *objs;
1878         struct lustre_handle      lh            = { 0 };
1879         __u32                     magic;
1880         int                       fl            = 0;
1881         int                       rc;
1882         int                       rc1;
1883         int                       i;
1884         __u16                     count;
1885         ENTRY;
1886
1887         CDEBUG(D_LFSCK, "Re-create the crashed layout EA: parent "
1888                DFID", child "DFID", OST-index %u, stripe-index %u\n",
1889                PFID(lfsck_dto2fid(parent)), PFID(cfid), ost_idx, ea_off);
1890
1891         rc = lfsck_layout_lock(env, com, parent, &lh,
1892                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
1893         if (rc != 0)
1894                 RETURN(rc);
1895
1896 again:
1897         if (!(bk->lb_param & LPF_DRYRUN)) {
1898                 handle = dt_trans_create(env, dt);
1899                 if (IS_ERR(handle))
1900                         GOTO(unlock_layout, rc = PTR_ERR(handle));
1901
1902                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
1903                                           fl, handle);
1904                 if (rc != 0)
1905                         GOTO(stop, rc);
1906
1907                 rc = dt_trans_start_local(env, dt, handle);
1908                 if (rc != 0)
1909                         GOTO(stop, rc);
1910         }
1911
1912         dt_write_lock(env, parent, 0);
1913         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
1914         if (rc == -ERANGE) {
1915                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
1916                                   BYPASS_CAPA);
1917                 LASSERT(rc != 0);
1918
1919                 dt_write_unlock(env, parent);
1920                 if (handle != NULL) {
1921                         dt_trans_stop(env, dt, handle);
1922                         handle = NULL;
1923                 }
1924
1925                 if (rc < 0)
1926                         GOTO(unlock_layout, rc);
1927
1928                 lu_buf_realloc(buf, rc);
1929                 buflen = buf->lb_len;
1930                 if (buf->lb_buf == NULL)
1931                         GOTO(unlock_layout, rc = -ENOMEM);
1932
1933                 fl = LU_XATTR_REPLACE;
1934                 goto again;
1935         } else if (rc == -ENODATA || rc == 0) {
1936                 fl = LU_XATTR_CREATE;
1937         } else if (rc < 0) {
1938                 GOTO(unlock_parent, rc);
1939         } else if (unlikely(buf->lb_len == 0)) {
1940                 dt_write_unlock(env, parent);
1941                 if (handle != NULL) {
1942                         dt_trans_stop(env, dt, handle);
1943                         handle = NULL;
1944                 }
1945
1946                 lu_buf_alloc(buf, rc);
1947                 buflen = buf->lb_len;
1948                 if (buf->lb_buf == NULL)
1949                         GOTO(unlock_layout, rc = -ENOMEM);
1950
1951                 fl = LU_XATTR_REPLACE;
1952                 goto again;
1953         } else {
1954                 fl = LU_XATTR_REPLACE;
1955         }
1956
1957         if (fl == LU_XATTR_CREATE) {
1958                 if (bk->lb_param & LPF_DRYRUN)
1959                         GOTO(unlock_parent, rc = 1);
1960
1961                 rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
1962                 /* If the declared is not big enough, re-try. */
1963                 if (buf->lb_len < rc) {
1964                         dt_write_unlock(env, parent);
1965                         if (handle != NULL) {
1966                                 dt_trans_stop(env, dt, handle);
1967                                 handle = NULL;
1968                         }
1969
1970                         lu_buf_realloc(buf, rc);
1971                         buflen = buf->lb_len;
1972                         if (buf->lb_buf == NULL)
1973                                 GOTO(unlock_layout, rc = -ENOMEM);
1974
1975                         goto again;
1976                 }
1977
1978                 buf->lb_len = rc;
1979                 rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
1980                                                fl, ost_idx, ea_off);
1981
1982                 GOTO(unlock_parent, rc);
1983         }
1984
1985         lmm = buf->lb_buf;
1986         rc1 = lfsck_layout_verify_header(lmm);
1987         if (rc1 != 0)
1988                 GOTO(unlock_parent, rc = rc1);
1989
1990         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
1991          * been verified in lfsck_layout_verify_header() already. If some
1992          * new magic introduced in the future, then layout LFSCK needs to
1993          * be updated also. */
1994         magic = le32_to_cpu(lmm->lmm_magic);
1995         if (magic == LOV_MAGIC_V1) {
1996                 objs = &(lmm->lmm_objects[0]);
1997         } else {
1998                 LASSERT(magic == LOV_MAGIC_V3);
1999                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2000         }
2001
2002         count = le16_to_cpu(lmm->lmm_stripe_count);
2003         if (count == 0)
2004                 GOTO(unlock_parent, rc = -EINVAL);
2005         LASSERT(count > 0);
2006
2007         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2008         if (count <= ea_off) {
2009                 if (bk->lb_param & LPF_DRYRUN)
2010                         GOTO(unlock_parent, rc = 1);
2011
2012                 rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2013                 /* If the declared is not big enough, re-try. */
2014                 if (buf->lb_len < rc) {
2015                         dt_write_unlock(env, parent);
2016                         if (handle != NULL) {
2017                                 dt_trans_stop(env, dt, handle);
2018                                 handle = NULL;
2019                         }
2020
2021                         lu_buf_realloc(buf, rc);
2022                         buflen = buf->lb_len;
2023                         if (buf->lb_buf == NULL)
2024                                 GOTO(unlock_layout, rc = -ENOMEM);
2025
2026                         goto again;
2027                 }
2028
2029                 buf->lb_len = rc;
2030                 rc = lfsck_layout_extend_lovea(env, handle, parent, cfid, buf,
2031                                                fl, ost_idx, ea_off);
2032                 GOTO(unlock_parent, rc);
2033         }
2034
2035         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2036
2037         buf->lb_len = rc;
2038         for (i = 0; i < count; i++, objs++) {
2039                 /* The MDT-object was created via lfsck_layout_recover_create()
2040                  * by others before, and we fill the dummy layout EA. */
2041                 if (is_dummy_lov_ost_data(objs)) {
2042                         if (i != ea_off)
2043                                 continue;
2044
2045                         if (bk->lb_param & LPF_DRYRUN)
2046                                 GOTO(unlock_parent, rc = 1);
2047
2048                         lmm->lmm_layout_gen =
2049                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2050                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2051                                                        cfid, buf, objs, fl,
2052                                                        ost_idx);
2053                         GOTO(unlock_parent, rc);
2054                 }
2055
2056                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2057                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2058                 /* It should be rare case, the slot is there, but the LFSCK
2059                  * does not handle it during the first-phase cycle scanning. */
2060                 if (unlikely(lu_fid_eq(fid, cfid))) {
2061                         if (i == ea_off) {
2062                                 GOTO(unlock_parent, rc = 0);
2063                         } else {
2064                                 /* Rare case that the OST-object index
2065                                  * does not match the parent MDT-object
2066                                  * layout EA. We trust the later one. */
2067                                 if (bk->lb_param & LPF_DRYRUN)
2068                                         GOTO(unlock_parent, rc = 1);
2069
2070                                 dt_write_unlock(env, parent);
2071                                 if (handle != NULL)
2072                                         dt_trans_stop(env, dt, handle);
2073                                 lfsck_layout_unlock(&lh);
2074                                 buf->lb_len = buflen;
2075                                 rc = lfsck_layout_update_pfid(env, com, parent,
2076                                                         cfid, ltd->ltd_tgt, i);
2077
2078                                 RETURN(rc);
2079                         }
2080                 }
2081         }
2082
2083         /* The MDT-object exists, but related layout EA slot is occupied
2084          * by others. */
2085         if (bk->lb_param & LPF_DRYRUN)
2086                 GOTO(unlock_parent, rc = 1);
2087
2088         dt_write_unlock(env, parent);
2089         if (handle != NULL)
2090                 dt_trans_stop(env, dt, handle);
2091         lfsck_layout_unlock(&lh);
2092         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2093                 objs = &(lmm->lmm_objects[ea_off]);
2094         else
2095                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2096         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2097                                           buf, objs, ea_off, buflen);
2098
2099         RETURN(rc);
2100
2101 unlock_parent:
2102         dt_write_unlock(env, parent);
2103
2104 stop:
2105         if (handle != NULL)
2106                 dt_trans_stop(env, dt, handle);
2107
2108 unlock_layout:
2109         lfsck_layout_unlock(&lh);
2110         buf->lb_len = buflen;
2111
2112         return rc;
2113 }
2114
2115 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2116                                         struct lfsck_component *com,
2117                                         struct lfsck_tgt_desc *ltd,
2118                                         struct lu_orphan_rec *rec,
2119                                         struct lu_fid *cfid)
2120 {
2121         struct lfsck_layout     *lo     = com->lc_file_ram;
2122         struct lu_fid           *pfid   = &rec->lor_fid;
2123         struct dt_object        *parent = NULL;
2124         __u32                    ea_off = pfid->f_ver;
2125         int                      rc     = 0;
2126         ENTRY;
2127
2128         if (!fid_is_sane(cfid))
2129                 GOTO(out, rc = -EINVAL);
2130
2131         if (fid_is_zero(pfid)) {
2132                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2133                                                   "N-", "", ea_off);
2134                 GOTO(out, rc);
2135         }
2136
2137         pfid->f_ver = 0;
2138         if (!fid_is_sane(pfid))
2139                 GOTO(out, rc = -EINVAL);
2140
2141         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2142         if (IS_ERR(parent))
2143                 GOTO(out, rc = PTR_ERR(parent));
2144
2145         if (unlikely(dt_object_remote(parent) != 0))
2146                 GOTO(put, rc = -EXDEV);
2147
2148         if (dt_object_exists(parent) == 0) {
2149                 lu_object_put(env, &parent->do_lu);
2150                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2151                                                   "R-", "", ea_off);
2152                 GOTO(out, rc);
2153         }
2154
2155         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2156                 GOTO(put, rc = -EISDIR);
2157
2158         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2159                                          ltd->ltd_index, ea_off);
2160
2161         GOTO(put, rc);
2162
2163 put:
2164         if (rc <= 0)
2165                 lu_object_put(env, &parent->do_lu);
2166         else
2167                 /* The layout EA is changed, need to be reloaded next time. */
2168                 lu_object_put_nocache(env, &parent->do_lu);
2169
2170 out:
2171         down_write(&com->lc_sem);
2172         com->lc_new_scanned++;
2173         com->lc_new_checked++;
2174         if (rc > 0) {
2175                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2176                 rc = 0;
2177         } else if (rc < 0) {
2178                 lo->ll_objs_failed_phase2++;
2179         }
2180         up_write(&com->lc_sem);
2181
2182         return rc;
2183 }
2184
2185 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2186                                     struct lfsck_component *com,
2187                                     struct lfsck_tgt_desc *ltd)
2188 {
2189         struct lfsck_layout             *lo     = com->lc_file_ram;
2190         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2191         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2192         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2193         struct ost_id                   *oi     = &info->lti_oi;
2194         struct lu_fid                   *fid    = &info->lti_fid;
2195         struct dt_object                *obj;
2196         const struct dt_it_ops          *iops;
2197         struct dt_it                    *di;
2198         int                              rc     = 0;
2199         ENTRY;
2200
2201         CDEBUG(D_LFSCK, "%s: start the orphan scanning for OST%04x\n",
2202                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2203
2204         ostid_set_seq(oi, FID_SEQ_IDIF);
2205         ostid_set_id(oi, 0);
2206         ostid_to_fid(fid, oi, ltd->ltd_index);
2207         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2208         if (unlikely(IS_ERR(obj)))
2209                 RETURN(PTR_ERR(obj));
2210
2211         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2212         if (rc != 0)
2213                 GOTO(put, rc);
2214
2215         iops = &obj->do_index_ops->dio_it;
2216         di = iops->init(env, obj, 0, BYPASS_CAPA);
2217         if (IS_ERR(di))
2218                 GOTO(put, rc = PTR_ERR(di));
2219
2220         rc = iops->load(env, di, 0);
2221         if (rc == -ESRCH) {
2222                 /* -ESRCH means that the orphan OST-objects rbtree has been
2223                  * cleanup because of the OSS server restart or other errors. */
2224                 lo->ll_flags |= LF_INCOMPLETE;
2225                 GOTO(fini, rc);
2226         }
2227
2228         if (rc == 0)
2229                 rc = iops->next(env, di);
2230         else if (rc > 0)
2231                 rc = 0;
2232
2233         if (rc < 0)
2234                 GOTO(fini, rc);
2235
2236         if (rc > 0)
2237                 GOTO(fini, rc = 0);
2238
2239         do {
2240                 struct dt_key           *key;
2241                 struct lu_orphan_rec    *rec = &info->lti_rec;
2242
2243                 key = iops->key(env, di);
2244                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2245                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2246                 if (rc == 0)
2247                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2248                                         &com->lc_fid_latest_scanned_phase2);
2249                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2250                         GOTO(fini, rc);
2251
2252                 lfsck_control_speed_by_self(com);
2253                 do {
2254                         rc = iops->next(env, di);
2255                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2256         } while (rc == 0);
2257
2258         GOTO(fini, rc);
2259
2260 fini:
2261         iops->put(env, di);
2262         iops->fini(env, di);
2263 put:
2264         lu_object_put(env, &obj->do_lu);
2265
2266         CDEBUG(D_LFSCK, "%s: finish the orphan scanning for OST%04x, rc = %d\n",
2267                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2268
2269         return rc > 0 ? 0 : rc;
2270 }
2271
2272 /* For the MDT-object with dangling reference, we need to re-create
2273  * the missed OST-object with the known FID/owner information. */
2274 static int lfsck_layout_recreate_ostobj(const struct lu_env *env,
2275                                         struct lfsck_component *com,
2276                                         struct lfsck_layout_req *llr,
2277                                         struct lu_attr *la)
2278 {
2279         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2280         struct filter_fid               *pfid   = &info->lti_new_pfid;
2281         struct dt_allocation_hint       *hint   = &info->lti_hint;
2282         struct dt_object                *parent = llr->llr_parent->llo_obj;
2283         struct dt_object                *child  = llr->llr_child;
2284         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2285         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2286         struct thandle                  *handle;
2287         struct lu_buf                   *buf;
2288         struct lustre_handle             lh     = { 0 };
2289         int                              rc;
2290         ENTRY;
2291
2292         CDEBUG(D_LFSCK, "Repair dangling reference for: parent "DFID
2293                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2294                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2295                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid);
2296
2297         rc = lfsck_layout_lock(env, com, parent, &lh,
2298                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2299         if (rc != 0)
2300                 RETURN(rc);
2301
2302         handle = dt_trans_create(env, dev);
2303         if (IS_ERR(handle))
2304                 GOTO(unlock1, rc = PTR_ERR(handle));
2305
2306         hint->dah_parent = NULL;
2307         hint->dah_mode = 0;
2308         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2309         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2310         pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
2311         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2312
2313         rc = dt_declare_create(env, child, la, hint, NULL, handle);
2314         if (rc != 0)
2315                 GOTO(stop, rc);
2316
2317         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2318                                   LU_XATTR_CREATE, handle);
2319         if (rc != 0)
2320                 GOTO(stop, rc);
2321
2322         rc = dt_trans_start(env, dev, handle);
2323         if (rc != 0)
2324                 GOTO(stop, rc);
2325
2326         dt_read_lock(env, parent, 0);
2327         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2328                 GOTO(unlock2, rc = 1);
2329
2330         rc = dt_create(env, child, la, hint, NULL, handle);
2331         if (rc != 0)
2332                 GOTO(unlock2, rc);
2333
2334         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2335                           handle, BYPASS_CAPA);
2336
2337         GOTO(unlock2, rc);
2338
2339 unlock2:
2340         dt_read_unlock(env, parent);
2341
2342 stop:
2343         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2344
2345 unlock1:
2346         lfsck_layout_unlock(&lh);
2347
2348         return rc;
2349 }
2350
2351 /* If the OST-object does not recognize the MDT-object as its parent, and
2352  * there is no other MDT-object claims as its parent, then just trust the
2353  * given MDT-object as its parent. So update the OST-object filter_fid. */
2354 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2355                                               struct lfsck_component *com,
2356                                               struct lfsck_layout_req *llr,
2357                                               const struct lu_attr *pla)
2358 {
2359         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2360         struct filter_fid               *pfid   = &info->lti_new_pfid;
2361         struct lu_attr                  *tla    = &info->lti_la3;
2362         struct dt_object                *parent = llr->llr_parent->llo_obj;
2363         struct dt_object                *child  = llr->llr_child;
2364         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2365         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2366         struct thandle                  *handle;
2367         struct lu_buf                   *buf;
2368         struct lustre_handle             lh     = { 0 };
2369         int                              rc;
2370         ENTRY;
2371
2372         CDEBUG(D_LFSCK, "Repair unmatched MDT-OST pair for: parent "DFID
2373                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2374                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2375                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
2376
2377         rc = lfsck_layout_lock(env, com, parent, &lh,
2378                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2379         if (rc != 0)
2380                 RETURN(rc);
2381
2382         handle = dt_trans_create(env, dev);
2383         if (IS_ERR(handle))
2384                 GOTO(unlock1, rc = PTR_ERR(handle));
2385
2386         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2387         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2388         /* The ff_parent->f_ver is not the real parent fid->f_ver. Instead,
2389          * it is the OST-object index in the parent MDT-object layout. */
2390         pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
2391         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2392
2393         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
2394         if (rc != 0)
2395                 GOTO(stop, rc);
2396
2397         tla->la_valid = LA_UID | LA_GID;
2398         tla->la_uid = pla->la_uid;
2399         tla->la_gid = pla->la_gid;
2400         rc = dt_declare_attr_set(env, child, tla, handle);
2401         if (rc != 0)
2402                 GOTO(stop, rc);
2403
2404         rc = dt_trans_start(env, dev, handle);
2405         if (rc != 0)
2406                 GOTO(stop, rc);
2407
2408         dt_write_lock(env, parent, 0);
2409         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2410                 GOTO(unlock2, rc = 1);
2411
2412         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
2413                           BYPASS_CAPA);
2414         if (rc != 0)
2415                 GOTO(unlock2, rc);
2416
2417         /* Get the latest parent's owner. */
2418         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2419         if (rc != 0)
2420                 GOTO(unlock2, rc);
2421
2422         tla->la_valid = LA_UID | LA_GID;
2423         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2424
2425         GOTO(unlock2, rc);
2426
2427 unlock2:
2428         dt_write_unlock(env, parent);
2429
2430 stop:
2431         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2432
2433 unlock1:
2434         lfsck_layout_unlock(&lh);
2435
2436         return rc;
2437 }
2438
2439 /* If there are more than one MDT-objects claim as the OST-object's parent,
2440  * and the OST-object only recognizes one of them, then we need to generate
2441  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
2442 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
2443                                                    struct lfsck_component *com,
2444                                                    struct lfsck_layout_req *llr,
2445                                                    struct lu_attr *la,
2446                                                    struct lu_buf *buf)
2447 {
2448         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2449         struct dt_allocation_hint       *hint   = &info->lti_hint;
2450         struct dt_object_format         *dof    = &info->lti_dof;
2451         struct dt_device                *pdev   = com->lc_lfsck->li_next;
2452         struct ost_id                   *oi     = &info->lti_oi;
2453         struct dt_object                *parent = llr->llr_parent->llo_obj;
2454         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
2455         struct dt_object                *child  = NULL;
2456         struct lu_device                *d      = &cdev->dd_lu_dev;
2457         struct lu_object                *o      = NULL;
2458         struct thandle                  *handle;
2459         struct lov_mds_md_v1            *lmm;
2460         struct lov_ost_data_v1          *objs;
2461         struct lustre_handle             lh     = { 0 };
2462         __u32                            magic;
2463         int                              rc;
2464         ENTRY;
2465
2466         CDEBUG(D_LFSCK, "Repair multiple references for: parent "DFID
2467                ", OST-index %u, stripe-index %u, owner %u:%u\n",
2468                PFID(lfsck_dto2fid(parent)), llr->llr_ost_idx,
2469                llr->llr_lov_idx, la->la_uid, la->la_gid);
2470
2471         rc = lfsck_layout_lock(env, com, parent, &lh,
2472                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2473         if (rc != 0)
2474                 RETURN(rc);
2475
2476         handle = dt_trans_create(env, pdev);
2477         if (IS_ERR(handle))
2478                 GOTO(unlock1, rc = PTR_ERR(handle));
2479
2480         o = lu_object_anon(env, d, NULL);
2481         if (IS_ERR(o))
2482                 GOTO(stop, rc = PTR_ERR(o));
2483
2484         child = container_of(o, struct dt_object, do_lu);
2485         o = lu_object_locate(o->lo_header, d->ld_type);
2486         if (unlikely(o == NULL))
2487                 GOTO(stop, rc = -EINVAL);
2488
2489         child = container_of(o, struct dt_object, do_lu);
2490         la->la_valid = LA_UID | LA_GID;
2491         hint->dah_parent = NULL;
2492         hint->dah_mode = 0;
2493         dof->dof_type = DFT_REGULAR;
2494         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
2495         if (rc != 0)
2496                 GOTO(stop, rc);
2497
2498         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2499                                   LU_XATTR_REPLACE, handle);
2500         if (rc != 0)
2501                 GOTO(stop, rc);
2502
2503         rc = dt_trans_start(env, pdev, handle);
2504         if (rc != 0)
2505                 GOTO(stop, rc);
2506
2507         dt_write_lock(env, parent, 0);
2508         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2509                 GOTO(unlock2, rc = 0);
2510
2511         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2512         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
2513                 GOTO(unlock2, rc = 0);
2514
2515         lmm = buf->lb_buf;
2516         rc = lfsck_layout_verify_header(lmm);
2517         if (rc != 0)
2518                 GOTO(unlock2, rc);
2519
2520         /* Someone change layout during the LFSCK, no need to repair then. */
2521         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
2522                 GOTO(unlock2, rc = 0);
2523
2524         rc = dt_create(env, child, la, hint, dof, handle);
2525         if (rc != 0)
2526                 GOTO(unlock2, rc);
2527
2528         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2529          * been verified in lfsck_layout_verify_header() already. If some
2530          * new magic introduced in the future, then layout LFSCK needs to
2531          * be updated also. */
2532         magic = le32_to_cpu(lmm->lmm_magic);
2533         if (magic == LOV_MAGIC_V1) {
2534                 objs = &(lmm->lmm_objects[0]);
2535         } else {
2536                 LASSERT(magic == LOV_MAGIC_V3);
2537                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2538         }
2539
2540         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
2541         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
2542         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
2543         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
2544         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
2545         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2546                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
2547
2548         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2549
2550 unlock2:
2551         dt_write_unlock(env, parent);
2552
2553 stop:
2554         if (child != NULL)
2555                 lu_object_put(env, &child->do_lu);
2556
2557         dt_trans_stop(env, pdev, handle);
2558
2559 unlock1:
2560         lfsck_layout_unlock(&lh);
2561
2562         return rc;
2563 }
2564
2565 /* If the MDT-object and the OST-object have different owner information,
2566  * then trust the MDT-object, because the normal chown/chgrp handle order
2567  * is from MDT to OST, and it is possible that some chown/chgrp operation
2568  * is partly done. */
2569 static int lfsck_layout_repair_owner(const struct lu_env *env,
2570                                      struct lfsck_component *com,
2571                                      struct lfsck_layout_req *llr,
2572                                      struct lu_attr *pla)
2573 {
2574         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2575         struct lu_attr                  *tla    = &info->lti_la3;
2576         struct dt_object                *parent = llr->llr_parent->llo_obj;
2577         struct dt_object                *child  = llr->llr_child;
2578         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2579         struct thandle                  *handle;
2580         int                              rc;
2581         ENTRY;
2582
2583         CDEBUG(D_LFSCK, "Repair inconsistent file owner for: parent "DFID
2584                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2585                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2586                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
2587
2588         handle = dt_trans_create(env, dev);
2589         if (IS_ERR(handle))
2590                 RETURN(PTR_ERR(handle));
2591
2592         tla->la_uid = pla->la_uid;
2593         tla->la_gid = pla->la_gid;
2594         tla->la_valid = LA_UID | LA_GID;
2595         rc = dt_declare_attr_set(env, child, tla, handle);
2596         if (rc != 0)
2597                 GOTO(stop, rc);
2598
2599         rc = dt_trans_start(env, dev, handle);
2600         if (rc != 0)
2601                 GOTO(stop, rc);
2602
2603         /* Use the dt_object lock to serialize with destroy and attr_set. */
2604         dt_read_lock(env, parent, 0);
2605         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2606                 GOTO(unlock, rc = 1);
2607
2608         /* Get the latest parent's owner. */
2609         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2610         if (rc != 0) {
2611                 CWARN("%s: fail to get the latest parent's ("DFID") owner, "
2612                       "not sure whether some others chown/chgrp during the "
2613                       "LFSCK: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2614                       PFID(lfsck_dto2fid(parent)), rc);
2615
2616                 GOTO(unlock, rc);
2617         }
2618
2619         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
2620         if (unlikely(tla->la_uid != pla->la_uid ||
2621                      tla->la_gid != pla->la_gid))
2622                 GOTO(unlock, rc = 1);
2623
2624         tla->la_valid = LA_UID | LA_GID;
2625         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2626
2627         GOTO(unlock, rc);
2628
2629 unlock:
2630         dt_read_unlock(env, parent);
2631
2632 stop:
2633         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2634
2635         return rc;
2636 }
2637
2638 /* Check whether the OST-object correctly back points to the
2639  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
2640 static int lfsck_layout_check_parent(const struct lu_env *env,
2641                                      struct lfsck_component *com,
2642                                      struct dt_object *parent,
2643                                      const struct lu_fid *pfid,
2644                                      const struct lu_fid *cfid,
2645                                      const struct lu_attr *pla,
2646                                      const struct lu_attr *cla,
2647                                      struct lfsck_layout_req *llr,
2648                                      struct lu_buf *lov_ea, __u32 idx)
2649 {
2650         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2651         struct lu_buf                   *buf    = &info->lti_big_buf;
2652         struct dt_object                *tobj;
2653         struct lov_mds_md_v1            *lmm;
2654         struct lov_ost_data_v1          *objs;
2655         int                              rc;
2656         int                              i;
2657         __u32                            magic;
2658         __u16                            count;
2659         ENTRY;
2660
2661         if (fid_is_zero(pfid)) {
2662                 /* client never wrote. */
2663                 if (cla->la_size == 0 && cla->la_blocks == 0) {
2664                         if (unlikely(cla->la_uid != pla->la_uid ||
2665                                      cla->la_gid != pla->la_gid))
2666                                 RETURN (LLIT_INCONSISTENT_OWNER);
2667
2668                         RETURN(0);
2669                 }
2670
2671                 RETURN(LLIT_UNMATCHED_PAIR);
2672         }
2673
2674         if (unlikely(!fid_is_sane(pfid)))
2675                 RETURN(LLIT_UNMATCHED_PAIR);
2676
2677         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
2678                 if (llr->llr_lov_idx == idx)
2679                         RETURN(0);
2680
2681                 RETURN(LLIT_UNMATCHED_PAIR);
2682         }
2683
2684         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
2685         if (tobj == NULL)
2686                 RETURN(LLIT_UNMATCHED_PAIR);
2687
2688         if (IS_ERR(tobj))
2689                 RETURN(PTR_ERR(tobj));
2690
2691         if (!dt_object_exists(tobj))
2692                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2693
2694         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
2695          * remote one on another MDT. Then check whether the given OST-object
2696          * is in such layout. If yes, it is multiple referenced, otherwise it
2697          * is unmatched referenced case. */
2698         rc = lfsck_layout_get_lovea(env, tobj, buf, NULL);
2699         if (rc == 0)
2700                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2701
2702         if (rc < 0)
2703                 GOTO(out, rc);
2704
2705         lmm = buf->lb_buf;
2706         rc = lfsck_layout_verify_header(lmm);
2707         if (rc != 0)
2708                 GOTO(out, rc);
2709
2710         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2711          * been verified in lfsck_layout_verify_header() already. If some
2712          * new magic introduced in the future, then layout LFSCK needs to
2713          * be updated also. */
2714         magic = le32_to_cpu(lmm->lmm_magic);
2715         if (magic == LOV_MAGIC_V1) {
2716                 objs = &(lmm->lmm_objects[0]);
2717         } else {
2718                 LASSERT(magic == LOV_MAGIC_V3);
2719                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2720         }
2721
2722         count = le16_to_cpu(lmm->lmm_stripe_count);
2723         for (i = 0; i < count; i++, objs++) {
2724                 struct lu_fid           *tfid   = &info->lti_fid2;
2725                 struct ost_id           *oi     = &info->lti_oi;
2726
2727                 if (is_dummy_lov_ost_data(objs))
2728                         continue;
2729
2730                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2731                 ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx));
2732                 if (lu_fid_eq(cfid, tfid)) {
2733                         *lov_ea = *buf;
2734
2735                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
2736                 }
2737         }
2738
2739         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2740
2741 out:
2742         lfsck_object_put(env, tobj);
2743
2744         return rc;
2745 }
2746
2747 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
2748                                              struct lfsck_component *com,
2749                                              struct lfsck_layout_req *llr)
2750 {
2751         struct lfsck_layout                  *lo     = com->lc_file_ram;
2752         struct lfsck_thread_info             *info   = lfsck_env_info(env);
2753         struct filter_fid_old                *pea    = &info->lti_old_pfid;
2754         struct lu_fid                        *pfid   = &info->lti_fid;
2755         struct lu_buf                        *buf    = NULL;
2756         struct dt_object                     *parent = llr->llr_parent->llo_obj;
2757         struct dt_object                     *child  = llr->llr_child;
2758         struct lu_attr                       *pla    = &info->lti_la;
2759         struct lu_attr                       *cla    = &info->lti_la2;
2760         struct lfsck_instance                *lfsck  = com->lc_lfsck;
2761         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
2762         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
2763         __u32                                 idx    = 0;
2764         int                                   rc;
2765         ENTRY;
2766
2767         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
2768         if (rc != 0) {
2769                 if (lu_object_is_dying(parent->do_lu.lo_header))
2770                         RETURN(0);
2771
2772                 GOTO(out, rc);
2773         }
2774
2775         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
2776         if (rc == -ENOENT) {
2777                 if (lu_object_is_dying(parent->do_lu.lo_header))
2778                         RETURN(0);
2779
2780                 type = LLIT_DANGLING;
2781                 goto repair;
2782         }
2783
2784         if (rc != 0)
2785                 GOTO(out, rc);
2786
2787         buf = lfsck_buf_get(env, pea, sizeof(struct filter_fid_old));
2788         rc= dt_xattr_get(env, child, buf, XATTR_NAME_FID, BYPASS_CAPA);
2789         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
2790                      rc != sizeof(struct filter_fid))) {
2791                 type = LLIT_UNMATCHED_PAIR;
2792                 goto repair;
2793         }
2794
2795         if (rc < 0 && rc != -ENODATA)
2796                 GOTO(out, rc);
2797
2798         if (rc == -ENODATA) {
2799                 fid_zero(pfid);
2800         } else {
2801                 fid_le_to_cpu(pfid, &pea->ff_parent);
2802                 /* OST-object does not save parent FID::f_ver, instead,
2803                  * the OST-object index in the parent MDT-object layout
2804                  * EA reuses the pfid->f_ver. */
2805                 idx = pfid->f_ver;
2806                 pfid->f_ver = 0;
2807         }
2808
2809         rc = lfsck_layout_check_parent(env, com, parent, pfid,
2810                                        lu_object_fid(&child->do_lu),
2811                                        pla, cla, llr, buf, idx);
2812         if (rc > 0) {
2813                 type = rc;
2814                 goto repair;
2815         }
2816
2817         if (rc < 0)
2818                 GOTO(out, rc);
2819
2820         if (unlikely(cla->la_uid != pla->la_uid ||
2821                      cla->la_gid != pla->la_gid)) {
2822                 type = LLIT_INCONSISTENT_OWNER;
2823                 goto repair;
2824         }
2825
2826 repair:
2827         if (bk->lb_param & LPF_DRYRUN) {
2828                 if (type != LLIT_NONE)
2829                         GOTO(out, rc = 1);
2830                 else
2831                         GOTO(out, rc = 0);
2832         }
2833
2834         switch (type) {
2835         case LLIT_DANGLING:
2836                 memset(cla, 0, sizeof(*cla));
2837                 cla->la_uid = pla->la_uid;
2838                 cla->la_gid = pla->la_gid;
2839                 cla->la_mode = S_IFREG | 0666;
2840                 cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2841                                 LA_ATIME | LA_MTIME | LA_CTIME;
2842                 rc = lfsck_layout_recreate_ostobj(env, com, llr, cla);
2843                 break;
2844         case LLIT_UNMATCHED_PAIR:
2845                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
2846                 break;
2847         case LLIT_MULTIPLE_REFERENCED:
2848                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
2849                                                              pla, buf);
2850                 break;
2851         case LLIT_INCONSISTENT_OWNER:
2852                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
2853                 break;
2854         default:
2855                 rc = 0;
2856                 break;
2857         }
2858
2859         GOTO(out, rc);
2860
2861 out:
2862         down_write(&com->lc_sem);
2863         if (rc < 0) {
2864                 /* If cannot touch the target server,
2865                  * mark the LFSCK as INCOMPLETE. */
2866                 if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT ||
2867                     rc == -EHOSTDOWN || rc == -EHOSTUNREACH) {
2868                         CERROR("%s: Fail to talk with OST %x: rc = %d.\n",
2869                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
2870                         lo->ll_flags |= LF_INCOMPLETE;
2871                         lo->ll_objs_skipped++;
2872                         rc = 0;
2873                 } else {
2874                         lo->ll_objs_failed_phase1++;
2875                 }
2876         } else if (rc > 0) {
2877                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
2878                          "unknown type = %d\n", type);
2879
2880                 lo->ll_objs_repaired[type - 1]++;
2881         }
2882         up_write(&com->lc_sem);
2883
2884         return rc;
2885 }
2886
2887 static int lfsck_layout_assistant(void *args)
2888 {
2889         struct lfsck_thread_args        *lta     = args;
2890         struct lu_env                   *env     = &lta->lta_env;
2891         struct lfsck_component          *com     = lta->lta_com;
2892         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
2893         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
2894         struct lfsck_position           *pos     = &com->lc_pos_start;
2895         struct lfsck_thread_info        *info    = lfsck_env_info(env);
2896         struct lfsck_request            *lr      = &info->lti_lr;
2897         struct lfsck_layout_master_data *llmd    = com->lc_data;
2898         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2899         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2900         struct lfsck_layout_req         *llr;
2901         struct l_wait_info               lwi     = { 0 };
2902         int                              rc      = 0;
2903         int                              rc1     = 0;
2904         ENTRY;
2905
2906         memset(lr, 0, sizeof(*lr));
2907         lr->lr_event = LE_START;
2908         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2909                        LSV_ASYNC_WINDOWS;
2910         lr->lr_speed = bk->lb_speed_limit;
2911         lr->lr_version = bk->lb_version;
2912         lr->lr_param = bk->lb_param;
2913         lr->lr_async_windows = bk->lb_async_windows;
2914         lr->lr_flags = LEF_TO_OST;
2915         if (pos->lp_oit_cookie <= 1)
2916                 lr->lr_param |= LPF_RESET;
2917
2918         rc = lfsck_layout_master_notify_others(env, com, lr);
2919         if (rc != 0) {
2920                 CERROR("%s: fail to notify others for layout start: rc = %d\n",
2921                        lfsck_lfsck2name(lfsck), rc);
2922                 GOTO(fini, rc);
2923         }
2924
2925         spin_lock(&llmd->llmd_lock);
2926         thread_set_flags(athread, SVC_RUNNING);
2927         spin_unlock(&llmd->llmd_lock);
2928         wake_up_all(&mthread->t_ctl_waitq);
2929
2930         while (1) {
2931                 while (!list_empty(&llmd->llmd_req_list)) {
2932                         bool wakeup = false;
2933
2934                         if (unlikely(llmd->llmd_exit))
2935                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
2936
2937                         llr = list_entry(llmd->llmd_req_list.next,
2938                                          struct lfsck_layout_req,
2939                                          llr_list);
2940                         /* Only the lfsck_layout_assistant thread itself can
2941                          * remove the "llr" from the head of the list, LFSCK
2942                          * engine thread only inserts other new "lld" at the
2943                          * end of the list. So it is safe to handle current
2944                          * "llr" without the spin_lock. */
2945                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
2946                         spin_lock(&llmd->llmd_lock);
2947                         list_del_init(&llr->llr_list);
2948                         if (bk->lb_async_windows != 0 &&
2949                             llmd->llmd_prefetched >= bk->lb_async_windows)
2950                                 wakeup = true;
2951
2952                         llmd->llmd_prefetched--;
2953                         spin_unlock(&llmd->llmd_lock);
2954                         if (wakeup)
2955                                 wake_up_all(&mthread->t_ctl_waitq);
2956
2957                         lfsck_layout_req_fini(env, llr);
2958                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
2959                                 GOTO(cleanup1, rc);
2960                 }
2961
2962                 /* Wakeup the master engine if it is waiting in checkpoint. */
2963                 wake_up_all(&mthread->t_ctl_waitq);
2964
2965                 l_wait_event(athread->t_ctl_waitq,
2966                              !lfsck_layout_req_empty(llmd) ||
2967                              llmd->llmd_exit ||
2968                              llmd->llmd_to_post ||
2969                              llmd->llmd_to_double_scan,
2970                              &lwi);
2971
2972                 if (unlikely(llmd->llmd_exit))
2973                         GOTO(cleanup1, rc = llmd->llmd_post_result);
2974
2975                 if (!list_empty(&llmd->llmd_req_list))
2976                         continue;
2977
2978                 if (llmd->llmd_to_post) {
2979                         llmd->llmd_to_post = 0;
2980                         LASSERT(llmd->llmd_post_result > 0);
2981
2982                         memset(lr, 0, sizeof(*lr));
2983                         lr->lr_event = LE_PHASE1_DONE;
2984                         lr->lr_status = llmd->llmd_post_result;
2985                         rc = lfsck_layout_master_notify_others(env, com, lr);
2986                         if (rc != 0)
2987                                 CERROR("%s: failed to notify others "
2988                                        "for layout post: rc = %d\n",
2989                                        lfsck_lfsck2name(lfsck), rc);
2990
2991                         /* Wakeup the master engine to go ahead. */
2992                         wake_up_all(&mthread->t_ctl_waitq);
2993                 }
2994
2995                 if (llmd->llmd_to_double_scan) {
2996                         llmd->llmd_to_double_scan = 0;
2997                         atomic_inc(&lfsck->li_double_scan_count);
2998                         llmd->llmd_in_double_scan = 1;
2999                         wake_up_all(&mthread->t_ctl_waitq);
3000
3001                         com->lc_new_checked = 0;
3002                         com->lc_new_scanned = 0;
3003                         com->lc_time_last_checkpoint = cfs_time_current();
3004                         com->lc_time_next_checkpoint =
3005                                 com->lc_time_last_checkpoint +
3006                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3007
3008                         while (llmd->llmd_in_double_scan) {
3009                                 struct lfsck_tgt_descs  *ltds =
3010                                                         &lfsck->li_ost_descs;
3011                                 struct lfsck_tgt_desc   *ltd;
3012
3013                                 rc = lfsck_layout_master_query_others(env, com);
3014                                 if (lfsck_layout_master_to_orphan(llmd))
3015                                         goto orphan;
3016
3017                                 if (rc < 0)
3018                                         GOTO(cleanup2, rc);
3019
3020                                 /* Pull LFSCK status on related targets once
3021                                  * per 30 seconds if we are not notified. */
3022                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
3023                                                            cfs_time_seconds(1),
3024                                                            NULL, NULL);
3025                                 rc = l_wait_event(athread->t_ctl_waitq,
3026                                         lfsck_layout_master_to_orphan(llmd) ||
3027                                         llmd->llmd_exit ||
3028                                         !thread_is_running(mthread),
3029                                         &lwi);
3030
3031                                 if (unlikely(llmd->llmd_exit ||
3032                                              !thread_is_running(mthread)))
3033                                         GOTO(cleanup2, rc = 0);
3034
3035                                 if (rc == -ETIMEDOUT)
3036                                         continue;
3037
3038                                 if (rc < 0)
3039                                         GOTO(cleanup2, rc);
3040
3041 orphan:
3042                                 spin_lock(&ltds->ltd_lock);
3043                                 while (!list_empty(
3044                                                 &llmd->llmd_ost_phase2_list)) {
3045                                         ltd = list_entry(
3046                                               llmd->llmd_ost_phase2_list.next,
3047                                               struct lfsck_tgt_desc,
3048                                               ltd_layout_phase_list);
3049                                         list_del_init(
3050                                                 &ltd->ltd_layout_phase_list);
3051                                         spin_unlock(&ltds->ltd_lock);
3052
3053                                         if (bk->lb_param & LPF_ALL_TGT) {
3054                                                 rc = lfsck_layout_scan_orphan(
3055                                                                 env, com, ltd);
3056                                                 if (rc != 0 &&
3057                                                     bk->lb_param & LPF_FAILOUT)
3058                                                         GOTO(cleanup2, rc);
3059                                         }
3060
3061                                         if (unlikely(llmd->llmd_exit ||
3062                                                 !thread_is_running(mthread)))
3063                                                 GOTO(cleanup2, rc = 0);
3064
3065                                         spin_lock(&ltds->ltd_lock);
3066                                 }
3067
3068                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
3069                                         spin_unlock(&ltds->ltd_lock);
3070                                         GOTO(cleanup2, rc = 1);
3071                                 }
3072                                 spin_unlock(&ltds->ltd_lock);
3073                         }
3074                 }
3075         }
3076
3077 cleanup1:
3078         /* Cleanup the unfinished requests. */
3079         spin_lock(&llmd->llmd_lock);
3080         if (rc < 0)
3081                 llmd->llmd_assistant_status = rc;
3082
3083         while (!list_empty(&llmd->llmd_req_list)) {
3084                 llr = list_entry(llmd->llmd_req_list.next,
3085                                  struct lfsck_layout_req,
3086                                  llr_list);
3087                 list_del_init(&llr->llr_list);
3088                 llmd->llmd_prefetched--;
3089                 spin_unlock(&llmd->llmd_lock);
3090                 lfsck_layout_req_fini(env, llr);
3091                 spin_lock(&llmd->llmd_lock);
3092         }
3093         spin_unlock(&llmd->llmd_lock);
3094
3095         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
3096                  llmd->llmd_prefetched);
3097
3098 cleanup2:
3099         memset(lr, 0, sizeof(*lr));
3100         if (rc > 0) {
3101                 lr->lr_event = LE_PHASE2_DONE;
3102                 lr->lr_status = rc;
3103         } else if (rc == 0) {
3104                 if (lfsck->li_flags & LPF_ALL_TGT) {
3105                         lr->lr_event = LE_STOP;
3106                         lr->lr_status = LS_STOPPED;
3107                 } else {
3108                         lr->lr_event = LE_PEER_EXIT;
3109                         switch (lfsck->li_status) {
3110                         case LS_PAUSED:
3111                         case LS_CO_PAUSED:
3112                                 lr->lr_status = LS_CO_PAUSED;
3113                                 break;
3114                         case LS_STOPPED:
3115                         case LS_CO_STOPPED:
3116                                 lr->lr_status = LS_CO_STOPPED;
3117                                 break;
3118                         default:
3119                                 CERROR("%s: unknown status: rc = %d\n",
3120                                        lfsck_lfsck2name(lfsck),
3121                                        lfsck->li_status);
3122                                 lr->lr_status = LS_CO_FAILED;
3123                                 break;
3124                         }
3125                 }
3126         } else {
3127                 if (lfsck->li_flags & LPF_ALL_TGT) {
3128                         lr->lr_event = LE_STOP;
3129                         lr->lr_status = LS_FAILED;
3130                 } else {
3131                         lr->lr_event = LE_PEER_EXIT;
3132                         lr->lr_status = LS_CO_FAILED;
3133                 }
3134         }
3135
3136         rc1 = lfsck_layout_master_notify_others(env, com, lr);
3137         if (rc1 != 0) {
3138                 CERROR("%s: failed to notify others for layout quit: rc = %d\n",
3139                        lfsck_lfsck2name(lfsck), rc1);
3140                 rc = rc1;
3141         }
3142
3143         /* Under force exit case, some requests may be just freed without
3144          * verification, those objects should be re-handled when next run.
3145          * So not update the on-disk tracing file under such case. */
3146         if (!llmd->llmd_exit)
3147                 rc1 = lfsck_layout_double_scan_result(env, com, rc);
3148
3149 fini:
3150         if (llmd->llmd_in_double_scan)
3151                 atomic_dec(&lfsck->li_double_scan_count);
3152
3153         spin_lock(&llmd->llmd_lock);
3154         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
3155         thread_set_flags(athread, SVC_STOPPED);
3156         wake_up_all(&mthread->t_ctl_waitq);
3157         spin_unlock(&llmd->llmd_lock);
3158         lfsck_thread_args_fini(lta);
3159
3160         return rc;
3161 }
3162
3163 static int
3164 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3165                                    struct ptlrpc_request *req,
3166                                    void *args, int rc)
3167 {
3168         struct lfsck_layout_slave_async_args *llsaa = args;
3169         struct obd_export                    *exp   = llsaa->llsaa_exp;
3170         struct lfsck_component               *com   = llsaa->llsaa_com;
3171         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3172         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3173         bool                                  done  = false;
3174
3175         if (rc != 0) {
3176                 /* It is quite probably caused by target crash,
3177                  * to make the LFSCK can go ahead, assume that
3178                  * the target finished the LFSCK prcoessing. */
3179                 done = true;
3180         } else {
3181                 struct lfsck_reply *lr;
3182
3183                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3184                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3185                     lr->lr_status != LS_SCANNING_PHASE2)
3186                         done = true;
3187         }
3188         if (done)
3189                 lfsck_layout_llst_del(llsd, llst);
3190         lfsck_layout_llst_put(llst);
3191         lfsck_component_put(env, com);
3192         class_export_put(exp);
3193
3194         return 0;
3195 }
3196
3197 static int lfsck_layout_async_query(const struct lu_env *env,
3198                                     struct lfsck_component *com,
3199                                     struct obd_export *exp,
3200                                     struct lfsck_layout_slave_target *llst,
3201                                     struct lfsck_request *lr,
3202                                     struct ptlrpc_request_set *set)
3203 {
3204         struct lfsck_layout_slave_async_args *llsaa;
3205         struct ptlrpc_request                *req;
3206         struct lfsck_request                 *tmp;
3207         int                                   rc;
3208         ENTRY;
3209
3210         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3211         if (req == NULL)
3212                 RETURN(-ENOMEM);
3213
3214         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3215         if (rc != 0) {
3216                 ptlrpc_request_free(req);
3217                 RETURN(rc);
3218         }
3219
3220         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3221         *tmp = *lr;
3222         ptlrpc_request_set_replen(req);
3223
3224         llsaa = ptlrpc_req_async_args(req);
3225         llsaa->llsaa_exp = exp;
3226         llsaa->llsaa_com = lfsck_component_get(com);
3227         llsaa->llsaa_llst = llst;
3228         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3229         ptlrpc_set_add_req(set, req);
3230
3231         RETURN(0);
3232 }
3233
3234 static int lfsck_layout_async_notify(const struct lu_env *env,
3235                                      struct obd_export *exp,
3236                                      struct lfsck_request *lr,
3237                                      struct ptlrpc_request_set *set)
3238 {
3239         struct ptlrpc_request   *req;
3240         struct lfsck_request    *tmp;
3241         int                      rc;
3242         ENTRY;
3243
3244         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3245         if (req == NULL)
3246                 RETURN(-ENOMEM);
3247
3248         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3249         if (rc != 0) {
3250                 ptlrpc_request_free(req);
3251                 RETURN(rc);
3252         }
3253
3254         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3255         *tmp = *lr;
3256         ptlrpc_request_set_replen(req);
3257         ptlrpc_set_add_req(set, req);
3258
3259         RETURN(0);
3260 }
3261
3262 static int
3263 lfsck_layout_slave_query_master(const struct lu_env *env,
3264                                 struct lfsck_component *com)
3265 {
3266         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3267         struct lfsck_instance            *lfsck = com->lc_lfsck;
3268         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3269         struct lfsck_layout_slave_target *llst;
3270         struct obd_export                *exp;
3271         struct ptlrpc_request_set        *set;
3272         int                               rc    = 0;
3273         int                               rc1   = 0;
3274         ENTRY;
3275
3276         set = ptlrpc_prep_set();
3277         if (set == NULL)
3278                 RETURN(-ENOMEM);
3279
3280         memset(lr, 0, sizeof(*lr));
3281         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3282         lr->lr_event = LE_QUERY;
3283         lr->lr_active = LT_LAYOUT;
3284
3285         llsd->llsd_touch_gen++;
3286         spin_lock(&llsd->llsd_lock);
3287         while (!list_empty(&llsd->llsd_master_list)) {
3288                 llst = list_entry(llsd->llsd_master_list.next,
3289                                   struct lfsck_layout_slave_target,
3290                                   llst_list);
3291                 if (llst->llst_gen == llsd->llsd_touch_gen)
3292                         break;
3293
3294                 llst->llst_gen = llsd->llsd_touch_gen;
3295                 list_del(&llst->llst_list);
3296                 list_add_tail(&llst->llst_list,
3297                               &llsd->llsd_master_list);
3298                 atomic_inc(&llst->llst_ref);
3299                 spin_unlock(&llsd->llsd_lock);
3300
3301                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3302                                                llst->llst_index);
3303                 if (exp == NULL) {
3304                         lfsck_layout_llst_del(llsd, llst);
3305                         lfsck_layout_llst_put(llst);
3306                         spin_lock(&llsd->llsd_lock);
3307                         continue;
3308                 }
3309
3310                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3311                 if (rc != 0) {
3312                         CERROR("%s: slave fail to query %s for layout: "
3313                                "rc = %d\n", lfsck_lfsck2name(lfsck),
3314                                exp->exp_obd->obd_name, rc);
3315                         rc1 = rc;
3316                         lfsck_layout_llst_put(llst);
3317                         class_export_put(exp);
3318                 }
3319                 spin_lock(&llsd->llsd_lock);
3320         }
3321         spin_unlock(&llsd->llsd_lock);
3322
3323         rc = ptlrpc_set_wait(set);
3324         ptlrpc_set_destroy(set);
3325
3326         RETURN(rc1 != 0 ? rc1 : rc);
3327 }
3328
3329 static void
3330 lfsck_layout_slave_notify_master(const struct lu_env *env,
3331                                  struct lfsck_component *com,
3332                                  enum lfsck_events event, int result)
3333 {
3334         struct lfsck_instance            *lfsck = com->lc_lfsck;
3335         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3336         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3337         struct lfsck_layout_slave_target *llst;
3338         struct obd_export                *exp;
3339         struct ptlrpc_request_set        *set;
3340         int                               rc;
3341         ENTRY;
3342
3343         set = ptlrpc_prep_set();
3344         if (set == NULL)
3345                 RETURN_EXIT;
3346
3347         memset(lr, 0, sizeof(*lr));
3348         lr->lr_event = event;
3349         lr->lr_flags = LEF_FROM_OST;
3350         lr->lr_status = result;
3351         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3352         lr->lr_active = LT_LAYOUT;
3353         llsd->llsd_touch_gen++;
3354         spin_lock(&llsd->llsd_lock);
3355         while (!list_empty(&llsd->llsd_master_list)) {
3356                 llst = list_entry(llsd->llsd_master_list.next,
3357                                   struct lfsck_layout_slave_target,
3358                                   llst_list);
3359                 if (llst->llst_gen == llsd->llsd_touch_gen)
3360                         break;
3361
3362                 llst->llst_gen = llsd->llsd_touch_gen;
3363                 list_del(&llst->llst_list);
3364                 list_add_tail(&llst->llst_list,
3365                               &llsd->llsd_master_list);
3366                 atomic_inc(&llst->llst_ref);
3367                 spin_unlock(&llsd->llsd_lock);
3368
3369                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3370                                                llst->llst_index);
3371                 if (exp == NULL) {
3372                         lfsck_layout_llst_del(llsd, llst);
3373                         lfsck_layout_llst_put(llst);
3374                         spin_lock(&llsd->llsd_lock);
3375                         continue;
3376                 }
3377
3378                 rc = lfsck_layout_async_notify(env, exp, lr, set);
3379                 if (rc != 0)
3380                         CERROR("%s: slave fail to notify %s for layout: "
3381                                "rc = %d\n", lfsck_lfsck2name(lfsck),
3382                                exp->exp_obd->obd_name, rc);
3383                 lfsck_layout_llst_put(llst);
3384                 class_export_put(exp);
3385                 spin_lock(&llsd->llsd_lock);
3386         }
3387         spin_unlock(&llsd->llsd_lock);
3388
3389         ptlrpc_set_wait(set);
3390         ptlrpc_set_destroy(set);
3391
3392         RETURN_EXIT;
3393 }
3394
3395 /* layout APIs */
3396
3397 static int lfsck_layout_reset(const struct lu_env *env,
3398                               struct lfsck_component *com, bool init)
3399 {
3400         struct lfsck_layout     *lo    = com->lc_file_ram;
3401         int                      rc;
3402
3403         down_write(&com->lc_sem);
3404         if (init) {
3405                 memset(lo, 0, com->lc_file_size);
3406         } else {
3407                 __u32 count = lo->ll_success_count;
3408                 __u64 last_time = lo->ll_time_last_complete;
3409
3410                 memset(lo, 0, com->lc_file_size);
3411                 lo->ll_success_count = count;
3412                 lo->ll_time_last_complete = last_time;
3413         }
3414
3415         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
3416         lo->ll_status = LS_INIT;
3417
3418         rc = lfsck_layout_store(env, com);
3419         up_write(&com->lc_sem);
3420
3421         return rc;
3422 }
3423
3424 static void lfsck_layout_fail(const struct lu_env *env,
3425                               struct lfsck_component *com, bool new_checked)
3426 {
3427         struct lfsck_layout *lo = com->lc_file_ram;
3428
3429         down_write(&com->lc_sem);
3430         if (new_checked)
3431                 com->lc_new_checked++;
3432         lo->ll_objs_failed_phase1++;
3433         if (lo->ll_pos_first_inconsistent == 0) {
3434                 struct lfsck_instance *lfsck = com->lc_lfsck;
3435
3436                 lo->ll_pos_first_inconsistent =
3437                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3438                                                         lfsck->li_di_oit);
3439         }
3440         up_write(&com->lc_sem);
3441 }
3442
3443 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
3444                                           struct lfsck_component *com, bool init)
3445 {
3446         struct lfsck_instance           *lfsck   = com->lc_lfsck;
3447         struct lfsck_layout             *lo      = com->lc_file_ram;
3448         struct lfsck_layout_master_data *llmd    = com->lc_data;
3449         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3450         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3451         struct l_wait_info               lwi     = { 0 };
3452         int                              rc;
3453
3454         if (com->lc_new_checked == 0 && !init)
3455                 return 0;
3456
3457         l_wait_event(mthread->t_ctl_waitq,
3458                      list_empty(&llmd->llmd_req_list) ||
3459                      !thread_is_running(mthread) ||
3460                      thread_is_stopped(athread),
3461                      &lwi);
3462
3463         if (!thread_is_running(mthread) || thread_is_stopped(athread))
3464                 return 0;
3465
3466         down_write(&com->lc_sem);
3467         if (init) {
3468                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
3469         } else {
3470                 lo->ll_pos_last_checkpoint =
3471                                         lfsck->li_pos_current.lp_oit_cookie;
3472                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3473                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3474                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3475                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3476                 com->lc_new_checked = 0;
3477         }
3478
3479         rc = lfsck_layout_store(env, com);
3480         up_write(&com->lc_sem);
3481
3482         return rc;
3483 }
3484
3485 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
3486                                          struct lfsck_component *com, bool init)
3487 {
3488         struct lfsck_instance   *lfsck = com->lc_lfsck;
3489         struct lfsck_layout     *lo    = com->lc_file_ram;
3490         int                      rc;
3491
3492         if (com->lc_new_checked == 0 && !init)
3493                 return 0;
3494
3495         down_write(&com->lc_sem);
3496
3497         if (init) {
3498                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
3499         } else {
3500                 lo->ll_pos_last_checkpoint =
3501                                         lfsck->li_pos_current.lp_oit_cookie;
3502                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3503                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3504                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3505                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3506                 com->lc_new_checked = 0;
3507         }
3508
3509         rc = lfsck_layout_store(env, com);
3510
3511         up_write(&com->lc_sem);
3512
3513         return rc;
3514 }
3515
3516 static int lfsck_layout_prep(const struct lu_env *env,
3517                              struct lfsck_component *com,
3518                              struct lfsck_start *start)
3519 {
3520         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3521         struct lfsck_layout     *lo     = com->lc_file_ram;
3522         struct lfsck_position   *pos    = &com->lc_pos_start;
3523
3524         fid_zero(&pos->lp_dir_parent);
3525         pos->lp_dir_cookie = 0;
3526         if (lo->ll_status == LS_COMPLETED ||
3527             lo->ll_status == LS_PARTIAL ||
3528             /* To handle orphan, must scan from the beginning. */
3529             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
3530                 int rc;
3531
3532                 rc = lfsck_layout_reset(env, com, false);
3533                 if (rc != 0)
3534                         return rc;
3535         }
3536
3537         down_write(&com->lc_sem);
3538         lo->ll_time_latest_start = cfs_time_current_sec();
3539         spin_lock(&lfsck->li_lock);
3540         if (lo->ll_flags & LF_SCANNED_ONCE) {
3541                 if (!lfsck->li_drop_dryrun ||
3542                     lo->ll_pos_first_inconsistent == 0) {
3543                         lo->ll_status = LS_SCANNING_PHASE2;
3544                         list_del_init(&com->lc_link);
3545                         list_add_tail(&com->lc_link,
3546                                       &lfsck->li_list_double_scan);
3547                         pos->lp_oit_cookie = 0;
3548                 } else {
3549                         int i;
3550
3551                         lo->ll_status = LS_SCANNING_PHASE1;
3552                         lo->ll_run_time_phase1 = 0;
3553                         lo->ll_run_time_phase2 = 0;
3554                         lo->ll_objs_checked_phase1 = 0;
3555                         lo->ll_objs_checked_phase2 = 0;
3556                         lo->ll_objs_failed_phase1 = 0;
3557                         lo->ll_objs_failed_phase2 = 0;
3558                         for (i = 0; i < LLIT_MAX; i++)
3559                                 lo->ll_objs_repaired[i] = 0;
3560
3561                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
3562                         fid_zero(&com->lc_fid_latest_scanned_phase2);
3563                 }
3564         } else {
3565                 lo->ll_status = LS_SCANNING_PHASE1;
3566                 if (!lfsck->li_drop_dryrun ||
3567                     lo->ll_pos_first_inconsistent == 0)
3568                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
3569                 else
3570                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
3571         }
3572         spin_unlock(&lfsck->li_lock);
3573         up_write(&com->lc_sem);
3574
3575         return 0;
3576 }
3577
3578 static int lfsck_layout_slave_prep(const struct lu_env *env,
3579                                    struct lfsck_component *com,
3580                                    struct lfsck_start_param *lsp)
3581 {
3582         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
3583         struct lfsck_start              *start  = lsp->lsp_start;
3584         int                              rc;
3585
3586         rc = lfsck_layout_prep(env, com, start);
3587         if (rc != 0 || !lsp->lsp_index_valid)
3588                 return rc;
3589
3590         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
3591         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
3592                 LASSERT(!llsd->llsd_rbtree_valid);
3593
3594                 write_lock(&llsd->llsd_rb_lock);
3595                 rc = lfsck_rbtree_setup(env, com);
3596                 write_unlock(&llsd->llsd_rb_lock);
3597         }
3598
3599         return rc;
3600 }
3601
3602 static int lfsck_layout_master_prep(const struct lu_env *env,
3603                                     struct lfsck_component *com,
3604                                     struct lfsck_start_param *lsp)
3605 {
3606         struct lfsck_instance           *lfsck   = com->lc_lfsck;
3607         struct lfsck_layout_master_data *llmd    = com->lc_data;
3608         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3609         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3610         struct lfsck_thread_args        *lta;
3611         long                             rc;
3612         ENTRY;
3613
3614         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
3615         if (rc != 0)
3616                 RETURN(rc);
3617
3618         llmd->llmd_assistant_status = 0;
3619         llmd->llmd_post_result = 0;
3620         llmd->llmd_to_post = 0;
3621         llmd->llmd_to_double_scan = 0;
3622         llmd->llmd_in_double_scan = 0;
3623         llmd->llmd_exit = 0;
3624         thread_set_flags(athread, 0);
3625
3626         lta = lfsck_thread_args_init(lfsck, com, lsp);
3627         if (IS_ERR(lta))
3628                 RETURN(PTR_ERR(lta));
3629
3630         rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"));
3631         if (IS_ERR_VALUE(rc)) {
3632                 CERROR("%s: Cannot start LFSCK layout assistant thread: "
3633                        "rc = %ld\n", lfsck_lfsck2name(lfsck), rc);
3634                 lfsck_thread_args_fini(lta);
3635         } else {
3636                 struct l_wait_info lwi = { 0 };
3637
3638                 l_wait_event(mthread->t_ctl_waitq,
3639                              thread_is_running(athread) ||
3640                              thread_is_stopped(athread),
3641                              &lwi);
3642                 if (unlikely(!thread_is_running(athread)))
3643                         rc = llmd->llmd_assistant_status;
3644                 else
3645                         rc = 0;
3646         }
3647
3648         RETURN(rc);
3649 }
3650
3651 /* Pre-fetch the attribute for each stripe in the given layout EA. */
3652 static int lfsck_layout_scan_stripes(const struct lu_env *env,
3653                                      struct lfsck_component *com,
3654                                      struct dt_object *parent,
3655                                      struct lov_mds_md_v1 *lmm)
3656 {
3657         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3658         struct lfsck_instance           *lfsck   = com->lc_lfsck;
3659         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3660         struct lfsck_layout             *lo      = com->lc_file_ram;
3661         struct lfsck_layout_master_data *llmd    = com->lc_data;
3662         struct lfsck_layout_object      *llo     = NULL;
3663         struct lov_ost_data_v1          *objs;
3664         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
3665         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3666         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3667                 struct l_wait_info       lwi     = { 0 };
3668         struct lu_buf                   *buf;
3669         int                              rc      = 0;
3670         int                              i;
3671         __u32                            magic;
3672         __u16                            count;
3673         __u16                            gen;
3674         ENTRY;
3675
3676         buf = lfsck_buf_get(env, &info->lti_old_pfid,
3677                             sizeof(struct filter_fid_old));
3678         count = le16_to_cpu(lmm->lmm_stripe_count);
3679         gen = le16_to_cpu(lmm->lmm_layout_gen);
3680         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3681          * been verified in lfsck_layout_verify_header() already. If some
3682          * new magic introduced in the future, then layout LFSCK needs to
3683          * be updated also. */
3684         magic = le32_to_cpu(lmm->lmm_magic);
3685         if (magic == LOV_MAGIC_V1) {
3686                 objs = &(lmm->lmm_objects[0]);
3687         } else {
3688                 LASSERT(magic == LOV_MAGIC_V3);
3689                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3690         }
3691
3692         for (i = 0; i < count; i++, objs++) {
3693                 struct lu_fid           *fid    = &info->lti_fid;
3694                 struct ost_id           *oi     = &info->lti_oi;
3695                 struct lfsck_layout_req *llr;
3696                 struct lfsck_tgt_desc   *tgt    = NULL;
3697                 struct dt_object        *cobj   = NULL;
3698                 __u32                    index  =
3699                                         le32_to_cpu(objs->l_ost_idx);
3700                 bool                     wakeup = false;
3701
3702                 if (is_dummy_lov_ost_data(objs))
3703                         continue;
3704
3705                 l_wait_event(mthread->t_ctl_waitq,
3706                              bk->lb_async_windows == 0 ||
3707                              llmd->llmd_prefetched < bk->lb_async_windows ||
3708                              !thread_is_running(mthread) ||
3709                              thread_is_stopped(athread),
3710                              &lwi);
3711
3712                 if (unlikely(!thread_is_running(mthread)) ||
3713                              thread_is_stopped(athread))
3714                         GOTO(out, rc = 0);
3715
3716                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3717                 ostid_to_fid(fid, oi, index);
3718                 tgt = lfsck_tgt_get(ltds, index);
3719                 if (unlikely(tgt == NULL)) {
3720                         CERROR("%s: Cannot talk with OST %x which did not join "
3721                                "the layout LFSCK.\n",
3722                                lfsck_lfsck2name(lfsck), index);
3723                         lo->ll_flags |= LF_INCOMPLETE;
3724                         goto next;
3725                 }
3726
3727                 cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
3728                 if (IS_ERR(cobj)) {
3729                         rc = PTR_ERR(cobj);
3730                         goto next;
3731                 }
3732
3733                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
3734                 if (rc != 0)
3735                         goto next;
3736
3737                 rc = dt_declare_xattr_get(env, cobj, buf, XATTR_NAME_FID,
3738                                           BYPASS_CAPA);
3739                 if (rc != 0)
3740                         goto next;
3741
3742                 if (llo == NULL) {
3743                         llo = lfsck_layout_object_init(env, parent, gen);
3744                         if (IS_ERR(llo)) {
3745                                 rc = PTR_ERR(llo);
3746                                 goto next;
3747                         }
3748                 }
3749
3750                 llr = lfsck_layout_req_init(llo, cobj, index, i);
3751                 if (IS_ERR(llr)) {
3752                         rc = PTR_ERR(llr);
3753                         goto next;
3754                 }
3755
3756                 cobj = NULL;
3757                 spin_lock(&llmd->llmd_lock);
3758                 if (llmd->llmd_assistant_status < 0) {
3759                         spin_unlock(&llmd->llmd_lock);
3760                         lfsck_layout_req_fini(env, llr);
3761                         lfsck_tgt_put(tgt);
3762                         RETURN(llmd->llmd_assistant_status);
3763                 }
3764
3765                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
3766                 if (llmd->llmd_prefetched == 0)
3767                         wakeup = true;
3768
3769                 llmd->llmd_prefetched++;
3770                 spin_unlock(&llmd->llmd_lock);
3771                 if (wakeup)
3772                         wake_up_all(&athread->t_ctl_waitq);
3773
3774 next:
3775                 down_write(&com->lc_sem);
3776                 com->lc_new_checked++;
3777                 if (rc < 0)
3778                         lo->ll_objs_failed_phase1++;
3779                 up_write(&com->lc_sem);
3780
3781                 if (cobj != NULL && !IS_ERR(cobj))
3782                         lu_object_put(env, &cobj->do_lu);
3783
3784                 if (likely(tgt != NULL))
3785                         lfsck_tgt_put(tgt);
3786
3787                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3788                         GOTO(out, rc);
3789         }
3790
3791         GOTO(out, rc = 0);
3792
3793 out:
3794         if (llo != NULL && !IS_ERR(llo))
3795                 lfsck_layout_object_put(env, llo);
3796
3797         return rc;
3798 }
3799
3800 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
3801  * the OST-object's attribute and generate an structure lfsck_layout_req on the
3802  * list ::llmd_req_list.
3803  *
3804  * For each request on above list, the lfsck_layout_assistant thread compares
3805  * the OST side attribute with local attribute, if inconsistent, then repair it.
3806  *
3807  * All above processing is async mode with pipeline. */
3808 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
3809                                         struct lfsck_component *com,
3810                                         struct dt_object *obj)
3811 {
3812         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3813         struct ost_id                   *oi     = &info->lti_oi;
3814         struct lfsck_layout             *lo     = com->lc_file_ram;
3815         struct lfsck_layout_master_data *llmd   = com->lc_data;
3816         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3817         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
3818         struct thandle                  *handle = NULL;
3819         struct lu_buf                   *buf    = &info->lti_big_buf;
3820         struct lov_mds_md_v1            *lmm    = NULL;
3821         struct dt_device                *dev    = lfsck->li_bottom;
3822         struct lustre_handle             lh     = { 0 };
3823         ssize_t                          buflen = buf->lb_len;
3824         int                              rc     = 0;
3825         bool                             locked = false;
3826         bool                             stripe = false;
3827         ENTRY;
3828
3829         if (!S_ISREG(lfsck_object_type(obj)))
3830                 GOTO(out, rc = 0);
3831
3832         if (llmd->llmd_assistant_status < 0)
3833                 GOTO(out, rc = -ESRCH);
3834
3835         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
3836         lmm_oi_cpu_to_le(oi, oi);
3837         dt_read_lock(env, obj, 0);
3838         locked = true;
3839
3840 again:
3841         rc = lfsck_layout_get_lovea(env, obj, buf, &buflen);
3842         if (rc <= 0)
3843                 GOTO(out, rc);
3844
3845         buf->lb_len = rc;
3846         lmm = buf->lb_buf;
3847         rc = lfsck_layout_verify_header(lmm);
3848         if (rc != 0)
3849                 GOTO(out, rc);
3850
3851         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
3852                 GOTO(out, stripe = true);
3853
3854         /* Inconsistent lmm_oi, should be repaired. */
3855         CDEBUG(D_LFSCK, "Repair bad lmm_oi for "DFID"\n",
3856                PFID(lfsck_dto2fid(obj)));
3857
3858         if (bk->lb_param & LPF_DRYRUN) {
3859                 down_write(&com->lc_sem);
3860                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
3861                 up_write(&com->lc_sem);
3862
3863                 GOTO(out, stripe = true);
3864         }
3865
3866         if (!lustre_handle_is_used(&lh)) {
3867                 dt_read_unlock(env, obj);
3868                 locked = false;
3869                 buf->lb_len = buflen;
3870                 rc = lfsck_layout_lock(env, com, obj, &lh,
3871                                        MDS_INODELOCK_LAYOUT |
3872                                        MDS_INODELOCK_XATTR);
3873                 if (rc != 0)
3874                         GOTO(out, rc);
3875
3876                 handle = dt_trans_create(env, dev);
3877                 if (IS_ERR(handle))
3878                         GOTO(out, rc = PTR_ERR(handle));
3879
3880                 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
3881                                           LU_XATTR_REPLACE, handle);
3882                 if (rc != 0)
3883                         GOTO(out, rc);
3884
3885                 rc = dt_trans_start_local(env, dev, handle);
3886                 if (rc != 0)
3887                         GOTO(out, rc);
3888
3889                 dt_write_lock(env, obj, 0);
3890                 locked = true;
3891
3892                 goto again;
3893         }
3894
3895         lmm->lmm_oi = *oi;
3896         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LOV,
3897                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
3898         if (rc != 0)
3899                 GOTO(out, rc);
3900
3901         down_write(&com->lc_sem);
3902         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
3903         up_write(&com->lc_sem);
3904
3905         GOTO(out, stripe = true);
3906
3907 out:
3908         if (locked) {
3909                 if (lustre_handle_is_used(&lh))
3910                         dt_write_unlock(env, obj);
3911                 else
3912                         dt_read_unlock(env, obj);
3913         }
3914
3915         if (handle != NULL && !IS_ERR(handle))
3916                 dt_trans_stop(env, dev, handle);
3917
3918         lfsck_layout_unlock(&lh);
3919         if (stripe) {
3920                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
3921         } else {
3922                 down_write(&com->lc_sem);
3923                 com->lc_new_checked++;
3924                 if (rc < 0)
3925                         lo->ll_objs_failed_phase1++;
3926                 up_write(&com->lc_sem);
3927         }
3928         buf->lb_len = buflen;
3929
3930         return rc;
3931 }
3932
3933 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
3934                                        struct lfsck_component *com,
3935                                        struct dt_object *obj)
3936 {
3937         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3938         struct lfsck_layout             *lo     = com->lc_file_ram;
3939         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
3940         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
3941         struct lfsck_layout_seq         *lls;
3942         __u64                            seq;
3943         __u64                            oid;
3944         int                              rc;
3945         ENTRY;
3946
3947         LASSERT(llsd != NULL);
3948
3949         lfsck_rbtree_update_bitmap(env, com, fid, false);
3950
3951         down_write(&com->lc_sem);
3952         if (fid_is_idif(fid))
3953                 seq = 0;
3954         else if (!fid_is_norm(fid) ||
3955                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
3956                 GOTO(unlock, rc = 0);
3957         else
3958                 seq = fid_seq(fid);
3959         com->lc_new_checked++;
3960
3961         lls = lfsck_layout_seq_lookup(llsd, seq);
3962         if (lls == NULL) {
3963                 OBD_ALLOC_PTR(lls);
3964                 if (unlikely(lls == NULL))
3965                         GOTO(unlock, rc = -ENOMEM);
3966
3967                 INIT_LIST_HEAD(&lls->lls_list);
3968                 lls->lls_seq = seq;
3969                 rc = lfsck_layout_lastid_load(env, com, lls);
3970                 if (rc != 0) {
3971                         lo->ll_objs_failed_phase1++;
3972                         OBD_FREE_PTR(lls);
3973                         GOTO(unlock, rc);
3974                 }
3975
3976                 lfsck_layout_seq_insert(llsd, lls);
3977         }
3978
3979         if (unlikely(fid_is_last_id(fid)))
3980                 GOTO(unlock, rc = 0);
3981
3982         oid = fid_oid(fid);
3983         if (oid > lls->lls_lastid_known)
3984                 lls->lls_lastid_known = oid;
3985
3986         if (oid > lls->lls_lastid) {
3987                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
3988                         /* OFD may create new objects during LFSCK scanning. */
3989                         rc = lfsck_layout_lastid_reload(env, com, lls);
3990                         if (unlikely(rc != 0))
3991                                 CWARN("%s: failed to reload LAST_ID for "LPX64
3992                                       ": rc = %d\n",
3993                                       lfsck_lfsck2name(com->lc_lfsck),
3994                                       lls->lls_seq, rc);
3995                         if (oid <= lls->lls_lastid)
3996                                 GOTO(unlock, rc = 0);
3997
3998                         LASSERT(lfsck->li_out_notify != NULL);
3999
4000                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4001                                              LE_LASTID_REBUILDING);
4002                         lo->ll_flags |= LF_CRASHED_LASTID;
4003                 }
4004
4005                 lls->lls_lastid = oid;
4006                 lls->lls_dirty = 1;
4007         }
4008
4009         GOTO(unlock, rc = 0);
4010
4011 unlock:
4012         up_write(&com->lc_sem);
4013
4014         return rc;
4015 }
4016
4017 static int lfsck_layout_exec_dir(const struct lu_env *env,
4018                                  struct lfsck_component *com,
4019                                  struct dt_object *obj,
4020                                  struct lu_dirent *ent)
4021 {
4022         return 0;
4023 }
4024
4025 static int lfsck_layout_master_post(const struct lu_env *env,
4026                                     struct lfsck_component *com,
4027                                     int result, bool init)
4028 {
4029         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4030         struct lfsck_layout             *lo      = com->lc_file_ram;
4031         struct lfsck_layout_master_data *llmd    = com->lc_data;
4032         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4033         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4034         struct l_wait_info               lwi     = { 0 };
4035         int                              rc;
4036         ENTRY;
4037
4038
4039         llmd->llmd_post_result = result;
4040         llmd->llmd_to_post = 1;
4041         if (llmd->llmd_post_result <= 0)
4042                 llmd->llmd_exit = 1;
4043
4044         wake_up_all(&athread->t_ctl_waitq);
4045         l_wait_event(mthread->t_ctl_waitq,
4046                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
4047                      thread_is_stopped(athread),
4048                      &lwi);
4049
4050         if (llmd->llmd_assistant_status < 0)
4051                 result = llmd->llmd_assistant_status;
4052
4053         down_write(&com->lc_sem);
4054         spin_lock(&lfsck->li_lock);
4055         /* When LFSCK failed, there may be some prefetched objects those are
4056          * not been processed yet, we do not know the exactly position, then
4057          * just restart from last check-point next time. */
4058         if (!init && !llmd->llmd_exit)
4059                 lo->ll_pos_last_checkpoint =
4060                                         lfsck->li_pos_current.lp_oit_cookie;
4061
4062         if (result > 0) {
4063                 lo->ll_status = LS_SCANNING_PHASE2;
4064                 lo->ll_flags |= LF_SCANNED_ONCE;
4065                 lo->ll_flags &= ~LF_UPGRADE;
4066                 list_del_init(&com->lc_link);
4067                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
4068         } else if (result == 0) {
4069                 lo->ll_status = lfsck->li_status;
4070                 if (lo->ll_status == 0)
4071                         lo->ll_status = LS_STOPPED;
4072                 if (lo->ll_status != LS_PAUSED) {
4073                         list_del_init(&com->lc_link);
4074                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4075                 }
4076         } else {
4077                 lo->ll_status = LS_FAILED;
4078                 list_del_init(&com->lc_link);
4079                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4080         }
4081         spin_unlock(&lfsck->li_lock);
4082
4083         if (!init) {
4084                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4085                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4086                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4087                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4088                 com->lc_new_checked = 0;
4089         }
4090
4091         rc = lfsck_layout_store(env, com);
4092         up_write(&com->lc_sem);
4093
4094         RETURN(rc);
4095 }
4096
4097 static int lfsck_layout_slave_post(const struct lu_env *env,
4098                                    struct lfsck_component *com,
4099                                    int result, bool init)
4100 {
4101         struct lfsck_instance   *lfsck = com->lc_lfsck;
4102         struct lfsck_layout     *lo    = com->lc_file_ram;
4103         int                      rc;
4104         bool                     done  = false;
4105
4106         rc = lfsck_layout_lastid_store(env, com);
4107         if (rc != 0)
4108                 result = rc;
4109
4110         LASSERT(lfsck->li_out_notify != NULL);
4111
4112         down_write(&com->lc_sem);
4113
4114         spin_lock(&lfsck->li_lock);
4115         if (!init)
4116                 lo->ll_pos_last_checkpoint =
4117                                         lfsck->li_pos_current.lp_oit_cookie;
4118         if (result > 0) {
4119                 lo->ll_status = LS_SCANNING_PHASE2;
4120                 lo->ll_flags |= LF_SCANNED_ONCE;
4121                 if (lo->ll_flags & LF_CRASHED_LASTID) {
4122                         done = true;
4123                         lo->ll_flags &= ~LF_CRASHED_LASTID;
4124                 }
4125                 lo->ll_flags &= ~LF_UPGRADE;
4126                 list_del_init(&com->lc_link);
4127                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
4128         } else if (result == 0) {
4129                 lo->ll_status = lfsck->li_status;
4130                 if (lo->ll_status == 0)
4131                         lo->ll_status = LS_STOPPED;
4132                 if (lo->ll_status != LS_PAUSED) {
4133                         list_del_init(&com->lc_link);
4134                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4135                 }
4136         } else {
4137                 lo->ll_status = LS_FAILED;
4138                 list_del_init(&com->lc_link);
4139                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4140         }
4141         spin_unlock(&lfsck->li_lock);
4142
4143         if (done)
4144                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4145                                      LE_LASTID_REBUILT);
4146
4147         if (!init) {
4148                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4149                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4150                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4151                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4152                 com->lc_new_checked = 0;
4153         }
4154
4155         rc = lfsck_layout_store(env, com);
4156
4157         up_write(&com->lc_sem);
4158
4159         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
4160
4161         if (result <= 0)
4162                 lfsck_rbtree_cleanup(env, com);
4163
4164         return rc;
4165 }
4166
4167 static int lfsck_layout_dump(const struct lu_env *env,
4168                              struct lfsck_component *com, char *buf, int len)
4169 {
4170         struct lfsck_instance   *lfsck = com->lc_lfsck;
4171         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
4172         struct lfsck_layout     *lo    = com->lc_file_ram;
4173         int                      save  = len;
4174         int                      ret   = -ENOSPC;
4175         int                      rc;
4176
4177         down_read(&com->lc_sem);
4178         rc = snprintf(buf, len,
4179                       "name: lfsck_layout\n"
4180                       "magic: %#x\n"
4181                       "version: %d\n"
4182                       "status: %s\n",
4183                       lo->ll_magic,
4184                       bk->lb_version,
4185                       lfsck_status2names(lo->ll_status));
4186         if (rc <= 0)
4187                 goto out;
4188
4189         buf += rc;
4190         len -= rc;
4191         rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
4192                              "flags");
4193         if (rc < 0)
4194                 goto out;
4195
4196         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
4197                              "param");
4198         if (rc < 0)
4199                 goto out;
4200
4201         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
4202                              "time_since_last_completed");
4203         if (rc < 0)
4204                 goto out;
4205
4206         rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
4207                              "time_since_latest_start");
4208         if (rc < 0)
4209                 goto out;
4210
4211         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
4212                              "time_since_last_checkpoint");
4213         if (rc < 0)
4214                 goto out;
4215
4216         rc = snprintf(buf, len,
4217                       "latest_start_position: "LPU64"\n"
4218                       "last_checkpoint_position: "LPU64"\n"
4219                       "first_failure_position: "LPU64"\n",
4220                       lo->ll_pos_latest_start,
4221                       lo->ll_pos_last_checkpoint,
4222                       lo->ll_pos_first_inconsistent);
4223         if (rc <= 0)
4224                 goto out;
4225
4226         buf += rc;
4227         len -= rc;
4228
4229         rc = snprintf(buf, len,
4230                       "success_count: %u\n"
4231                       "repaired_dangling: "LPU64"\n"
4232                       "repaired_unmatched_pair: "LPU64"\n"
4233                       "repaired_multiple_referenced: "LPU64"\n"
4234                       "repaired_orphan: "LPU64"\n"
4235                       "repaired_inconsistent_owner: "LPU64"\n"
4236                       "repaired_others: "LPU64"\n"
4237                       "skipped: "LPU64"\n"
4238                       "failed_phase1: "LPU64"\n"
4239                       "failed_phase2: "LPU64"\n",
4240                       lo->ll_success_count,
4241                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
4242                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
4243                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
4244                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
4245                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
4246                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
4247                       lo->ll_objs_skipped,
4248                       lo->ll_objs_failed_phase1,
4249                       lo->ll_objs_failed_phase2);
4250         if (rc <= 0)
4251                 goto out;
4252
4253         buf += rc;
4254         len -= rc;
4255
4256         if (lo->ll_status == LS_SCANNING_PHASE1) {
4257                 __u64 pos;
4258                 const struct dt_it_ops *iops;
4259                 cfs_duration_t duration = cfs_time_current() -
4260                                           lfsck->li_time_last_checkpoint;
4261                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
4262                 __u64 speed = checked;
4263                 __u64 new_checked = com->lc_new_checked * HZ;
4264                 __u32 rtime = lo->ll_run_time_phase1 +
4265                               cfs_duration_sec(duration + HALF_SEC);
4266
4267                 if (duration != 0)
4268                         do_div(new_checked, duration);
4269                 if (rtime != 0)
4270                         do_div(speed, rtime);
4271                 rc = snprintf(buf, len,
4272                               "checked_phase1: "LPU64"\n"
4273                               "checked_phase2: "LPU64"\n"
4274                               "run_time_phase1: %u seconds\n"
4275                               "run_time_phase2: %u seconds\n"
4276                               "average_speed_phase1: "LPU64" items/sec\n"
4277                               "average_speed_phase2: N/A\n"
4278                               "real-time_speed_phase1: "LPU64" items/sec\n"
4279                               "real-time_speed_phase2: N/A\n",
4280                               checked,
4281                               lo->ll_objs_checked_phase2,
4282                               rtime,
4283                               lo->ll_run_time_phase2,
4284                               speed,
4285                               new_checked);
4286                 if (rc <= 0)
4287                         goto out;
4288
4289                 buf += rc;
4290                 len -= rc;
4291
4292                 LASSERT(lfsck->li_di_oit != NULL);
4293
4294                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
4295
4296                 /* The low layer otable-based iteration position may NOT
4297                  * exactly match the layout-based directory traversal
4298                  * cookie. Generally, it is not a serious issue. But the
4299                  * caller should NOT make assumption on that. */
4300                 pos = iops->store(env, lfsck->li_di_oit);
4301                 if (!lfsck->li_current_oit_processed)
4302                         pos--;
4303                 rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
4304                 if (rc <= 0)
4305                         goto out;
4306
4307                 buf += rc;
4308                 len -= rc;
4309         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
4310                 cfs_duration_t duration = cfs_time_current() -
4311                                           lfsck->li_time_last_checkpoint;
4312                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
4313                 __u64 speed = checked;
4314                 __u64 new_checked = com->lc_new_checked * HZ;
4315                 __u32 rtime = lo->ll_run_time_phase1 +
4316                               cfs_duration_sec(duration + HALF_SEC);
4317
4318                 if (duration != 0)
4319                         do_div(new_checked, duration);
4320                 if (rtime != 0)
4321                         do_div(speed, rtime);
4322                 rc = snprintf(buf, len,
4323                               "checked_phase1: "LPU64"\n"
4324                               "checked_phase2: "LPU64"\n"
4325                               "run_time_phase1: %u seconds\n"
4326                               "run_time_phase2: %u seconds\n"
4327                               "average_speed_phase1: "LPU64" items/sec\n"
4328                               "average_speed_phase2: N/A\n"
4329                               "real-time_speed_phase1: "LPU64" items/sec\n"
4330                               "real-time_speed_phase2: N/A\n"
4331                               "current_position: "DFID"\n",
4332                               checked,
4333                               lo->ll_objs_checked_phase2,
4334                               rtime,
4335                               lo->ll_run_time_phase2,
4336                               speed,
4337                               new_checked,
4338                               PFID(&com->lc_fid_latest_scanned_phase2));
4339                 if (rc <= 0)
4340                         goto out;
4341
4342                 buf += rc;
4343                 len -= rc;
4344         } else {
4345                 __u64 speed1 = lo->ll_objs_checked_phase1;
4346                 __u64 speed2 = lo->ll_objs_checked_phase2;
4347
4348                 if (lo->ll_run_time_phase1 != 0)
4349                         do_div(speed1, lo->ll_run_time_phase1);
4350                 if (lo->ll_run_time_phase2 != 0)
4351                         do_div(speed2, lo->ll_run_time_phase2);
4352                 rc = snprintf(buf, len,
4353                               "checked_phase1: "LPU64"\n"
4354                               "checked_phase2: "LPU64"\n"
4355                               "run_time_phase1: %u seconds\n"
4356                               "run_time_phase2: %u seconds\n"
4357                               "average_speed_phase1: "LPU64" items/sec\n"
4358                               "average_speed_phase2: "LPU64" objs/sec\n"
4359                               "real-time_speed_phase1: N/A\n"
4360                               "real-time_speed_phase2: N/A\n"
4361                               "current_position: N/A\n",
4362                               lo->ll_objs_checked_phase1,
4363                               lo->ll_objs_checked_phase2,
4364                               lo->ll_run_time_phase1,
4365                               lo->ll_run_time_phase2,
4366                               speed1,
4367                               speed2);
4368                 if (rc <= 0)
4369                         goto out;
4370
4371                 buf += rc;
4372                 len -= rc;
4373         }
4374         ret = save - len;
4375
4376 out:
4377         up_read(&com->lc_sem);
4378
4379         return ret;
4380 }
4381
4382 static int lfsck_layout_master_double_scan(const struct lu_env *env,
4383                                            struct lfsck_component *com)
4384 {
4385         struct lfsck_layout_master_data *llmd    = com->lc_data;
4386         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
4387         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4388         struct lfsck_layout             *lo      = com->lc_file_ram;
4389         struct l_wait_info               lwi     = { 0 };
4390
4391         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
4392                 return 0;
4393
4394         llmd->llmd_to_double_scan = 1;
4395         wake_up_all(&athread->t_ctl_waitq);
4396         l_wait_event(mthread->t_ctl_waitq,
4397                      llmd->llmd_in_double_scan ||
4398                      thread_is_stopped(athread),
4399                      &lwi);
4400         if (llmd->llmd_assistant_status < 0)
4401                 return llmd->llmd_assistant_status;
4402
4403         return 0;
4404 }
4405
4406 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
4407                                           struct lfsck_component *com)
4408 {
4409         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4410         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4411         struct lfsck_layout             *lo     = com->lc_file_ram;
4412         struct ptlrpc_thread            *thread = &lfsck->li_thread;
4413         int                              rc;
4414         ENTRY;
4415
4416         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
4417                 lfsck_rbtree_cleanup(env, com);
4418                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
4419                 RETURN(0);
4420         }
4421
4422         atomic_inc(&lfsck->li_double_scan_count);
4423
4424         com->lc_new_checked = 0;
4425         com->lc_new_scanned = 0;
4426         com->lc_time_last_checkpoint = cfs_time_current();
4427         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
4428                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
4429
4430         while (1) {
4431                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
4432                                                      NULL, NULL);
4433
4434                 rc = lfsck_layout_slave_query_master(env, com);
4435                 if (list_empty(&llsd->llsd_master_list)) {
4436                         if (unlikely(!thread_is_running(thread)))
4437                                 rc = 0;
4438                         else
4439                                 rc = 1;
4440
4441                         GOTO(done, rc);
4442                 }
4443
4444                 if (rc < 0)
4445                         GOTO(done, rc);
4446
4447                 rc = l_wait_event(thread->t_ctl_waitq,
4448                                   !thread_is_running(thread) ||
4449                                   list_empty(&llsd->llsd_master_list),
4450                                   &lwi);
4451                 if (unlikely(!thread_is_running(thread)))
4452                         GOTO(done, rc = 0);
4453
4454                 if (rc == -ETIMEDOUT)
4455                         continue;
4456
4457                 GOTO(done, rc = (rc < 0 ? rc : 1));
4458         }
4459
4460 done:
4461         rc = lfsck_layout_double_scan_result(env, com, rc);
4462
4463         lfsck_rbtree_cleanup(env, com);
4464         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
4465         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
4466                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
4467
4468         return rc;
4469 }
4470
4471 static void lfsck_layout_master_data_release(const struct lu_env *env,
4472                                              struct lfsck_component *com)
4473 {
4474         struct lfsck_layout_master_data *llmd   = com->lc_data;
4475         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4476         struct lfsck_tgt_descs          *ltds;
4477         struct lfsck_tgt_desc           *ltd;
4478         struct lfsck_tgt_desc           *next;
4479
4480         LASSERT(llmd != NULL);
4481         LASSERT(thread_is_init(&llmd->llmd_thread) ||
4482                 thread_is_stopped(&llmd->llmd_thread));
4483         LASSERT(list_empty(&llmd->llmd_req_list));
4484
4485         com->lc_data = NULL;
4486
4487         ltds = &lfsck->li_ost_descs;
4488         spin_lock(&ltds->ltd_lock);
4489         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
4490                                  ltd_layout_phase_list) {
4491                 list_del_init(&ltd->ltd_layout_phase_list);
4492         }
4493         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
4494                                  ltd_layout_phase_list) {
4495                 list_del_init(&ltd->ltd_layout_phase_list);
4496         }
4497         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
4498                                  ltd_layout_list) {
4499                 list_del_init(&ltd->ltd_layout_list);
4500         }
4501         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
4502                                  ltd_layout_phase_list) {
4503                 list_del_init(&ltd->ltd_layout_phase_list);
4504         }
4505         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
4506                                  ltd_layout_phase_list) {
4507                 list_del_init(&ltd->ltd_layout_phase_list);
4508         }
4509         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
4510                                  ltd_layout_list) {
4511                 list_del_init(&ltd->ltd_layout_list);
4512         }
4513         spin_unlock(&ltds->ltd_lock);
4514
4515         OBD_FREE_PTR(llmd);
4516 }
4517
4518 static void lfsck_layout_slave_data_release(const struct lu_env *env,
4519                                             struct lfsck_component *com)
4520 {
4521         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4522         struct lfsck_layout_seq          *lls;
4523         struct lfsck_layout_seq          *next;
4524         struct lfsck_layout_slave_target *llst;
4525         struct lfsck_layout_slave_target *tmp;
4526
4527         LASSERT(llsd != NULL);
4528
4529         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
4530                                      lls_list) {
4531                 list_del_init(&lls->lls_list);
4532                 lfsck_object_put(env, lls->lls_lastid_obj);
4533                 OBD_FREE_PTR(lls);
4534         }
4535
4536         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
4537                                  llst_list) {
4538                 list_del_init(&llst->llst_list);
4539                 OBD_FREE_PTR(llst);
4540         }
4541
4542         lfsck_rbtree_cleanup(env, com);
4543         com->lc_data = NULL;
4544         OBD_FREE_PTR(llsd);
4545 }
4546
4547 static void lfsck_layout_master_quit(const struct lu_env *env,
4548                                      struct lfsck_component *com)
4549 {
4550         struct lfsck_layout_master_data *llmd    = com->lc_data;
4551         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
4552         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4553         struct l_wait_info               lwi     = { 0 };
4554
4555         llmd->llmd_exit = 1;
4556         wake_up_all(&athread->t_ctl_waitq);
4557         l_wait_event(mthread->t_ctl_waitq,
4558                      thread_is_init(athread) ||
4559                      thread_is_stopped(athread),
4560                      &lwi);
4561 }
4562
4563 static void lfsck_layout_slave_quit(const struct lu_env *env,
4564                                     struct lfsck_component *com)
4565 {
4566         lfsck_rbtree_cleanup(env, com);
4567 }
4568
4569 static int lfsck_layout_master_in_notify(const struct lu_env *env,
4570                                          struct lfsck_component *com,
4571                                          struct lfsck_request *lr)
4572 {
4573         struct lfsck_instance           *lfsck = com->lc_lfsck;
4574         struct lfsck_layout             *lo    = com->lc_file_ram;
4575         struct lfsck_layout_master_data *llmd  = com->lc_data;
4576         struct lfsck_tgt_descs          *ltds;
4577         struct lfsck_tgt_desc           *ltd;
4578         bool                             fail  = false;
4579         ENTRY;
4580
4581         if (lr->lr_event != LE_PHASE1_DONE &&
4582             lr->lr_event != LE_PHASE2_DONE &&
4583             lr->lr_event != LE_PEER_EXIT)
4584                 RETURN(-EINVAL);
4585
4586         if (lr->lr_flags & LEF_FROM_OST)
4587                 ltds = &lfsck->li_ost_descs;
4588         else
4589                 ltds = &lfsck->li_mdt_descs;
4590         spin_lock(&ltds->ltd_lock);
4591         ltd = LTD_TGT(ltds, lr->lr_index);
4592         if (ltd == NULL) {
4593                 spin_unlock(&ltds->ltd_lock);
4594
4595                 RETURN(-ENODEV);
4596         }
4597
4598         list_del_init(&ltd->ltd_layout_phase_list);
4599         switch (lr->lr_event) {
4600         case LE_PHASE1_DONE:
4601                 if (lr->lr_status <= 0) {
4602                         ltd->ltd_layout_done = 1;
4603                         list_del_init(&ltd->ltd_layout_list);
4604                         CWARN("%s: %s %x failed/stopped at phase1: rc = %d.\n",
4605                               lfsck_lfsck2name(lfsck),
4606                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
4607                               ltd->ltd_index, lr->lr_status);
4608                         lo->ll_flags |= LF_INCOMPLETE;
4609                         fail = true;
4610                         break;
4611                 }
4612
4613                 if (lr->lr_flags & LEF_FROM_OST) {
4614                         if (list_empty(&ltd->ltd_layout_list))
4615                                 list_add_tail(&ltd->ltd_layout_list,
4616                                               &llmd->llmd_ost_list);
4617                         list_add_tail(&ltd->ltd_layout_phase_list,
4618                                       &llmd->llmd_ost_phase2_list);
4619                 } else {
4620                         if (list_empty(&ltd->ltd_layout_list))
4621                                 list_add_tail(&ltd->ltd_layout_list,
4622                                               &llmd->llmd_mdt_list);
4623                         list_add_tail(&ltd->ltd_layout_phase_list,
4624                                       &llmd->llmd_mdt_phase2_list);
4625                 }
4626                 break;
4627         case LE_PHASE2_DONE:
4628                 ltd->ltd_layout_done = 1;
4629                 list_del_init(&ltd->ltd_layout_list);
4630                 break;
4631         case LE_PEER_EXIT:
4632                 fail = true;
4633                 ltd->ltd_layout_done = 1;
4634                 list_del_init(&ltd->ltd_layout_list);
4635                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
4636                         CWARN("%s: the peer %s %x exit layout LFSCK.\n",
4637                               lfsck_lfsck2name(lfsck),
4638                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
4639                               ltd->ltd_index);
4640                         lo->ll_flags |= LF_INCOMPLETE;
4641                 }
4642                 break;
4643         default:
4644                 break;
4645         }
4646         spin_unlock(&ltds->ltd_lock);
4647
4648         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
4649                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4650
4651                 memset(stop, 0, sizeof(*stop));
4652                 stop->ls_status = lr->lr_status;
4653                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4654                 lfsck_stop(env, lfsck->li_bottom, stop);
4655         } else if (lfsck_layout_master_to_orphan(llmd)) {
4656                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
4657         }
4658
4659         RETURN(0);
4660 }
4661
4662 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
4663                                         struct lfsck_component *com,
4664                                         struct lfsck_request *lr)
4665 {
4666         struct lfsck_instance            *lfsck = com->lc_lfsck;
4667         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4668         struct lfsck_layout_slave_target *llst;
4669         ENTRY;
4670
4671         if (lr->lr_event == LE_FID_ACCESSED) {
4672                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
4673
4674                 RETURN(0);
4675         }
4676
4677         if (lr->lr_event != LE_PHASE2_DONE && lr->lr_event != LE_PEER_EXIT)
4678                 RETURN(-EINVAL);
4679
4680         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
4681         if (llst == NULL)
4682                 RETURN(-ENODEV);
4683
4684         lfsck_layout_llst_put(llst);
4685         if (list_empty(&llsd->llsd_master_list))
4686                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
4687
4688         if (lr->lr_event == LE_PEER_EXIT &&
4689             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
4690                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4691
4692                 memset(stop, 0, sizeof(*stop));
4693                 stop->ls_status = lr->lr_status;
4694                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4695                 lfsck_stop(env, lfsck->li_bottom, stop);
4696         }
4697
4698         RETURN(0);
4699 }
4700
4701 static int lfsck_layout_query(const struct lu_env *env,
4702                               struct lfsck_component *com)
4703 {
4704         struct lfsck_layout *lo = com->lc_file_ram;
4705
4706         return lo->ll_status;
4707 }
4708
4709 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
4710                                            struct lfsck_component *com,
4711                                            struct lfsck_tgt_descs *ltds,
4712                                            struct lfsck_tgt_desc *ltd,
4713                                            struct ptlrpc_request_set *set)
4714 {
4715         struct lfsck_thread_info          *info  = lfsck_env_info(env);
4716         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
4717         struct lfsck_request              *lr    = &info->lti_lr;
4718         struct lfsck_instance             *lfsck = com->lc_lfsck;
4719         int                                rc;
4720
4721         spin_lock(&ltds->ltd_lock);
4722         if (list_empty(&ltd->ltd_layout_list)) {
4723                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
4724                 spin_unlock(&ltds->ltd_lock);
4725
4726                 return 0;
4727         }
4728
4729         list_del_init(&ltd->ltd_layout_phase_list);
4730         list_del_init(&ltd->ltd_layout_list);
4731         spin_unlock(&ltds->ltd_lock);
4732
4733         memset(lr, 0, sizeof(*lr));
4734         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
4735         lr->lr_event = LE_PEER_EXIT;
4736         lr->lr_active = LT_LAYOUT;
4737         lr->lr_status = LS_CO_PAUSED;
4738         if (ltds == &lfsck->li_ost_descs)
4739                 lr->lr_flags = LEF_TO_OST;
4740
4741         laia->laia_com = com;
4742         laia->laia_ltds = ltds;
4743         atomic_inc(&ltd->ltd_ref);
4744         laia->laia_ltd = ltd;
4745         laia->laia_lr = lr;
4746         laia->laia_shared = 0;
4747
4748         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
4749                                  lfsck_layout_master_async_interpret,
4750                                  laia, LFSCK_NOTIFY);
4751         if (rc != 0) {
4752                 CERROR("%s: Fail to notify %s %x for co-stop: rc = %d\n",
4753                        lfsck_lfsck2name(lfsck),
4754                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
4755                        ltd->ltd_index, rc);
4756                 lfsck_tgt_put(ltd);
4757         }
4758
4759         return rc;
4760 }
4761
4762 /* with lfsck::li_lock held */
4763 static int lfsck_layout_slave_join(const struct lu_env *env,
4764                                    struct lfsck_component *com,
4765                                    struct lfsck_start_param *lsp)
4766 {
4767         struct lfsck_instance            *lfsck = com->lc_lfsck;
4768         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4769         struct lfsck_layout_slave_target *llst;
4770         struct lfsck_start               *start = lsp->lsp_start;
4771         int                               rc    = 0;
4772         ENTRY;
4773
4774         if (!lsp->lsp_index_valid || start == NULL ||
4775             !(start->ls_flags & LPF_ALL_TGT) ||
4776             !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT))
4777                 RETURN(-EALREADY);
4778
4779         spin_unlock(&lfsck->li_lock);
4780         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4781         spin_lock(&lfsck->li_lock);
4782         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
4783                 spin_unlock(&lfsck->li_lock);
4784                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
4785                                                       true);
4786                 if (llst != NULL)
4787                         lfsck_layout_llst_put(llst);
4788                 spin_lock(&lfsck->li_lock);
4789                 rc = -EAGAIN;
4790         }
4791
4792         RETURN(rc);
4793 }
4794
4795 static struct lfsck_operations lfsck_layout_master_ops = {
4796         .lfsck_reset            = lfsck_layout_reset,
4797         .lfsck_fail             = lfsck_layout_fail,
4798         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
4799         .lfsck_prep             = lfsck_layout_master_prep,
4800         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
4801         .lfsck_exec_dir         = lfsck_layout_exec_dir,
4802         .lfsck_post             = lfsck_layout_master_post,
4803         .lfsck_interpret        = lfsck_layout_master_async_interpret,
4804         .lfsck_dump             = lfsck_layout_dump,
4805         .lfsck_double_scan      = lfsck_layout_master_double_scan,
4806         .lfsck_data_release     = lfsck_layout_master_data_release,
4807         .lfsck_quit             = lfsck_layout_master_quit,
4808         .lfsck_in_notify        = lfsck_layout_master_in_notify,
4809         .lfsck_query            = lfsck_layout_query,
4810         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
4811 };
4812
4813 static struct lfsck_operations lfsck_layout_slave_ops = {
4814         .lfsck_reset            = lfsck_layout_reset,
4815         .lfsck_fail             = lfsck_layout_fail,
4816         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
4817         .lfsck_prep             = lfsck_layout_slave_prep,
4818         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
4819         .lfsck_exec_dir         = lfsck_layout_exec_dir,
4820         .lfsck_post             = lfsck_layout_slave_post,
4821         .lfsck_dump             = lfsck_layout_dump,
4822         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
4823         .lfsck_data_release     = lfsck_layout_slave_data_release,
4824         .lfsck_quit             = lfsck_layout_slave_quit,
4825         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
4826         .lfsck_query            = lfsck_layout_query,
4827         .lfsck_join             = lfsck_layout_slave_join,
4828 };
4829
4830 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
4831 {
4832         struct lfsck_component  *com;
4833         struct lfsck_layout     *lo;
4834         struct dt_object        *root = NULL;
4835         struct dt_object        *obj;
4836         int                      rc;
4837         ENTRY;
4838
4839         OBD_ALLOC_PTR(com);
4840         if (com == NULL)
4841                 RETURN(-ENOMEM);
4842
4843         INIT_LIST_HEAD(&com->lc_link);
4844         INIT_LIST_HEAD(&com->lc_link_dir);
4845         init_rwsem(&com->lc_sem);
4846         atomic_set(&com->lc_ref, 1);
4847         com->lc_lfsck = lfsck;
4848         com->lc_type = LT_LAYOUT;
4849         if (lfsck->li_master) {
4850                 struct lfsck_layout_master_data *llmd;
4851
4852                 com->lc_ops = &lfsck_layout_master_ops;
4853                 OBD_ALLOC_PTR(llmd);
4854                 if (llmd == NULL)
4855                         GOTO(out, rc = -ENOMEM);
4856
4857                 INIT_LIST_HEAD(&llmd->llmd_req_list);
4858                 spin_lock_init(&llmd->llmd_lock);
4859                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
4860                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
4861                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
4862                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
4863                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
4864                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
4865                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
4866                 com->lc_data = llmd;
4867         } else {
4868                 struct lfsck_layout_slave_data *llsd;
4869
4870                 com->lc_ops = &lfsck_layout_slave_ops;
4871                 OBD_ALLOC_PTR(llsd);
4872                 if (llsd == NULL)
4873                         GOTO(out, rc = -ENOMEM);
4874
4875                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
4876                 INIT_LIST_HEAD(&llsd->llsd_master_list);
4877                 spin_lock_init(&llsd->llsd_lock);
4878                 llsd->llsd_rb_root = RB_ROOT;
4879                 rwlock_init(&llsd->llsd_rb_lock);
4880                 com->lc_data = llsd;
4881         }
4882         com->lc_file_size = sizeof(*lo);
4883         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
4884         if (com->lc_file_ram == NULL)
4885                 GOTO(out, rc = -ENOMEM);
4886
4887         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
4888         if (com->lc_file_disk == NULL)
4889                 GOTO(out, rc = -ENOMEM);
4890
4891         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
4892         if (IS_ERR(root))
4893                 GOTO(out, rc = PTR_ERR(root));
4894
4895         if (unlikely(!dt_try_as_dir(env, root)))
4896                 GOTO(out, rc = -ENOTDIR);
4897
4898         obj = local_file_find_or_create(env, lfsck->li_los, root,
4899                                         lfsck_layout_name,
4900                                         S_IFREG | S_IRUGO | S_IWUSR);
4901         if (IS_ERR(obj))
4902                 GOTO(out, rc = PTR_ERR(obj));
4903
4904         com->lc_obj = obj;
4905         rc = lfsck_layout_load(env, com);
4906         if (rc > 0)
4907                 rc = lfsck_layout_reset(env, com, true);
4908         else if (rc == -ENOENT)
4909                 rc = lfsck_layout_init(env, com);
4910
4911         if (rc != 0)
4912                 GOTO(out, rc);
4913
4914         lo = com->lc_file_ram;
4915         switch (lo->ll_status) {
4916         case LS_INIT:
4917         case LS_COMPLETED:
4918         case LS_FAILED:
4919         case LS_STOPPED:
4920         case LS_PARTIAL:
4921                 spin_lock(&lfsck->li_lock);
4922                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4923                 spin_unlock(&lfsck->li_lock);
4924                 break;
4925         default:
4926                 CERROR("%s: unknown lfsck_layout status: rc = %u\n",
4927                        lfsck_lfsck2name(lfsck), lo->ll_status);
4928                 /* fall through */
4929         case LS_SCANNING_PHASE1:
4930         case LS_SCANNING_PHASE2:
4931                 /* No need to store the status to disk right now.
4932                  * If the system crashed before the status stored,
4933                  * it will be loaded back when next time. */
4934                 lo->ll_status = LS_CRASHED;
4935                 lo->ll_flags |= LF_INCOMPLETE;
4936                 /* fall through */
4937         case LS_PAUSED:
4938         case LS_CRASHED:
4939         case LS_CO_FAILED:
4940         case LS_CO_STOPPED:
4941         case LS_CO_PAUSED:
4942                 spin_lock(&lfsck->li_lock);
4943                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
4944                 spin_unlock(&lfsck->li_lock);
4945                 break;
4946         }
4947
4948         if (lo->ll_flags & LF_CRASHED_LASTID) {
4949                 LASSERT(lfsck->li_out_notify != NULL);
4950
4951                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4952                                      LE_LASTID_REBUILDING);
4953         }
4954
4955         GOTO(out, rc = 0);
4956
4957 out:
4958         if (root != NULL && !IS_ERR(root))
4959                 lu_object_put(env, &root->do_lu);
4960
4961         if (rc != 0)
4962                 lfsck_component_cleanup(env, com);
4963
4964         return rc;
4965 }
4966
4967 struct lfsck_orphan_it {
4968         struct lfsck_component           *loi_com;
4969         struct lfsck_rbtree_node         *loi_lrn;
4970         struct lfsck_layout_slave_target *loi_llst;
4971         struct lu_fid                     loi_key;
4972         struct lu_orphan_rec              loi_rec;
4973         __u64                             loi_hash;
4974         unsigned int                      loi_over:1;
4975 };
4976
4977 static int lfsck_fid_match_idx(const struct lu_env *env,
4978                                struct lfsck_instance *lfsck,
4979                                const struct lu_fid *fid, int idx)
4980 {
4981         struct seq_server_site  *ss;
4982         struct lu_server_fld    *sf;
4983         struct lu_seq_range      range  = { 0 };
4984         int                      rc;
4985
4986         /* All abnormal cases will be returned to MDT0. */
4987         if (!fid_is_norm(fid)) {
4988                 if (idx == 0)
4989                         return 1;
4990
4991                 return 0;
4992         }
4993
4994         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
4995         if (unlikely(ss == NULL))
4996                 return -ENOTCONN;
4997
4998         sf = ss->ss_server_fld;
4999         LASSERT(sf != NULL);
5000
5001         fld_range_set_any(&range);
5002         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
5003         if (rc != 0)
5004                 return rc;
5005
5006         if (!fld_range_is_mdt(&range))
5007                 return -EINVAL;
5008
5009         if (range.lsr_index == idx)
5010                 return 1;
5011
5012         return 0;
5013 }
5014
5015 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
5016                                         struct dt_device *dev,
5017                                         struct dt_object *obj)
5018 {
5019         struct thandle *handle;
5020         int             rc;
5021         ENTRY;
5022
5023         handle = dt_trans_create(env, dev);
5024         if (IS_ERR(handle))
5025                 RETURN_EXIT;
5026
5027         rc = dt_declare_ref_del(env, obj, handle);
5028         if (rc != 0)
5029                 GOTO(stop, rc);
5030
5031         rc = dt_declare_destroy(env, obj, handle);
5032         if (rc != 0)
5033                 GOTO(stop, rc);
5034
5035         rc = dt_trans_start_local(env, dev, handle);
5036         if (rc != 0)
5037                 GOTO(stop, rc);
5038
5039         dt_write_lock(env, obj, 0);
5040         rc = dt_ref_del(env, obj, handle);
5041         if (rc == 0)
5042                 rc = dt_destroy(env, obj, handle);
5043         dt_write_unlock(env, obj);
5044
5045         GOTO(stop, rc);
5046
5047 stop:
5048         dt_trans_stop(env, dev, handle);
5049
5050         RETURN_EXIT;
5051 }
5052
5053 static int lfsck_orphan_index_lookup(const struct lu_env *env,
5054                                      struct dt_object *dt,
5055                                      struct dt_rec *rec,
5056                                      const struct dt_key *key,
5057                                      struct lustre_capa *capa)
5058 {
5059         return -EOPNOTSUPP;
5060 }
5061
5062 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
5063                                              struct dt_object *dt,
5064                                              const struct dt_rec *rec,
5065                                              const struct dt_key *key,
5066                                              struct thandle *handle)
5067 {
5068         return -EOPNOTSUPP;
5069 }
5070
5071 static int lfsck_orphan_index_insert(const struct lu_env *env,
5072                                      struct dt_object *dt,
5073                                      const struct dt_rec *rec,
5074                                      const struct dt_key *key,
5075                                      struct thandle *handle,
5076                                      struct lustre_capa *capa,
5077                                      int ignore_quota)
5078 {
5079         return -EOPNOTSUPP;
5080 }
5081
5082 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
5083                                              struct dt_object *dt,
5084                                              const struct dt_key *key,
5085                                              struct thandle *handle)
5086 {
5087         return -EOPNOTSUPP;
5088 }
5089
5090 static int lfsck_orphan_index_delete(const struct lu_env *env,
5091                                      struct dt_object *dt,
5092                                      const struct dt_key *key,
5093                                      struct thandle *handle,
5094                                      struct lustre_capa *capa)
5095 {
5096         return -EOPNOTSUPP;
5097 }
5098
5099 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
5100                                           struct dt_object *dt,
5101                                           __u32 attr,
5102                                           struct lustre_capa *capa)
5103 {
5104         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
5105         struct lfsck_instance           *lfsck;
5106         struct lfsck_component          *com    = NULL;
5107         struct lfsck_layout_slave_data  *llsd;
5108         struct lfsck_orphan_it          *it     = NULL;
5109         int                              rc     = 0;
5110         ENTRY;
5111
5112         lfsck = lfsck_instance_find(dev, true, false);
5113         if (unlikely(lfsck == NULL))
5114                 RETURN(ERR_PTR(-ENODEV));
5115
5116         com = lfsck_component_find(lfsck, LT_LAYOUT);
5117         if (unlikely(com == NULL))
5118                 GOTO(out, rc = -ENOENT);
5119
5120         llsd = com->lc_data;
5121         if (!llsd->llsd_rbtree_valid)
5122                 GOTO(out, rc = -ESRCH);
5123
5124         OBD_ALLOC_PTR(it);
5125         if (it == NULL)
5126                 GOTO(out, rc = -ENOMEM);
5127
5128         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
5129         if (it->loi_llst == NULL)
5130                 GOTO(out, rc = -ENODEV);
5131
5132         if (dev->dd_record_fid_accessed) {
5133                 /* The first iteration against the rbtree, scan the whole rbtree
5134                  * to remove the nodes which do NOT need to be handled. */
5135                 write_lock(&llsd->llsd_rb_lock);
5136                 if (dev->dd_record_fid_accessed) {
5137                         struct rb_node                  *node;
5138                         struct rb_node                  *next;
5139                         struct lfsck_rbtree_node        *lrn;
5140
5141                         /* No need to record the fid accessing anymore. */
5142                         dev->dd_record_fid_accessed = 0;
5143
5144                         node = rb_first(&llsd->llsd_rb_root);
5145                         while (node != NULL) {
5146                                 next = rb_next(node);
5147                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
5148                                                lrn_node);
5149                                 if (atomic_read(&lrn->lrn_known_count) <=
5150                                     atomic_read(&lrn->lrn_accessed_count)) {
5151                                         rb_erase(node, &llsd->llsd_rb_root);
5152                                         lfsck_rbtree_free(lrn);
5153                                 }
5154                                 node = next;
5155                         }
5156                 }
5157                 write_unlock(&llsd->llsd_rb_lock);
5158         }
5159
5160         /* read lock the rbtree when init, and unlock when fini */
5161         read_lock(&llsd->llsd_rb_lock);
5162         it->loi_com = com;
5163         com = NULL;
5164
5165         GOTO(out, rc = 0);
5166
5167 out:
5168         if (com != NULL)
5169                 lfsck_component_put(env, com);
5170         lfsck_instance_put(env, lfsck);
5171         if (rc != 0) {
5172                 if (it != NULL)
5173                         OBD_FREE_PTR(it);
5174
5175                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
5176         }
5177
5178         return (struct dt_it *)it;
5179 }
5180
5181 static void lfsck_orphan_it_fini(const struct lu_env *env,
5182                                  struct dt_it *di)
5183 {
5184         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
5185         struct lfsck_component           *com   = it->loi_com;
5186         struct lfsck_layout_slave_data   *llsd;
5187         struct lfsck_layout_slave_target *llst;
5188
5189         if (com != NULL) {
5190                 llsd = com->lc_data;
5191                 read_unlock(&llsd->llsd_rb_lock);
5192                 llst = it->loi_llst;
5193                 LASSERT(llst != NULL);
5194
5195                 /* Save the key and hash for iterate next. */
5196                 llst->llst_fid = it->loi_key;
5197                 llst->llst_hash = it->loi_hash;
5198                 lfsck_layout_llst_put(llst);
5199                 lfsck_component_put(env, com);
5200         }
5201         OBD_FREE_PTR(it);
5202 }
5203
5204 /**
5205  * \retval       +1: the iteration finished
5206  * \retval        0: on success, not finished
5207  * \retval      -ve: on error
5208  */
5209 static int lfsck_orphan_it_next(const struct lu_env *env,
5210                                 struct dt_it *di)
5211 {
5212         struct lfsck_thread_info        *info   = lfsck_env_info(env);
5213         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
5214         struct lu_attr                  *la     = &info->lti_la;
5215         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
5216         struct lu_fid                   *key    = &it->loi_key;
5217         struct lu_orphan_rec            *rec    = &it->loi_rec;
5218         struct lfsck_component          *com    = it->loi_com;
5219         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5220         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5221         struct dt_object                *obj;
5222         struct lfsck_rbtree_node        *lrn;
5223         int                              pos;
5224         int                              rc;
5225         __u32                            save;
5226         __u32                            idx    = it->loi_llst->llst_index;
5227         bool                             exact  = false;
5228         ENTRY;
5229
5230         if (it->loi_over)
5231                 RETURN(1);
5232
5233 again0:
5234         lrn = it->loi_lrn;
5235         if (lrn == NULL) {
5236                 lrn = lfsck_rbtree_search(llsd, key, &exact);
5237                 if (lrn == NULL) {
5238                         it->loi_over = 1;
5239                         RETURN(1);
5240                 }
5241
5242                 it->loi_lrn = lrn;
5243                 if (!exact) {
5244                         key->f_seq = lrn->lrn_seq;
5245                         key->f_oid = lrn->lrn_first_oid;
5246                         key->f_ver = 0;
5247                 }
5248         } else {
5249                 key->f_oid++;
5250                 if (unlikely(key->f_oid == 0)) {
5251                         key->f_seq++;
5252                         it->loi_lrn = NULL;
5253                         goto again0;
5254                 }
5255
5256                 if (key->f_oid >=
5257                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
5258                         it->loi_lrn = NULL;
5259                         goto again0;
5260                 }
5261         }
5262
5263         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
5264                      atomic_read(&lrn->lrn_accessed_count))) {
5265                 struct rb_node *next = rb_next(&lrn->lrn_node);
5266
5267                 while (next != NULL) {
5268                         lrn = rb_entry(next, struct lfsck_rbtree_node,
5269                                        lrn_node);
5270                         if (atomic_read(&lrn->lrn_known_count) >
5271                             atomic_read(&lrn->lrn_accessed_count))
5272                                 break;
5273                         next = rb_next(next);
5274                 }
5275
5276                 if (next == NULL) {
5277                         it->loi_over = 1;
5278                         RETURN(1);
5279                 }
5280
5281                 it->loi_lrn = lrn;
5282                 key->f_seq = lrn->lrn_seq;
5283                 key->f_oid = lrn->lrn_first_oid;
5284                 key->f_ver = 0;
5285         }
5286
5287         pos = key->f_oid - lrn->lrn_first_oid;
5288
5289 again1:
5290         pos = find_next_bit(lrn->lrn_known_bitmap,
5291                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
5292         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
5293                 key->f_oid = lrn->lrn_first_oid + pos;
5294                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
5295                         key->f_seq++;
5296                         key->f_oid = 0;
5297                 }
5298                 it->loi_lrn = NULL;
5299                 goto again0;
5300         }
5301
5302         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
5303                 pos++;
5304                 goto again1;
5305         }
5306
5307         key->f_oid = lrn->lrn_first_oid + pos;
5308         obj = lfsck_object_find(env, lfsck, key);
5309         if (IS_ERR(obj)) {
5310                 rc = PTR_ERR(obj);
5311                 if (rc == -ENOENT) {
5312                         pos++;
5313                         goto again1;
5314                 }
5315                 RETURN(rc);
5316         }
5317
5318         dt_read_lock(env, obj, 0);
5319         if (!dt_object_exists(obj)) {
5320                 dt_read_unlock(env, obj);
5321                 lfsck_object_put(env, obj);
5322                 pos++;
5323                 goto again1;
5324         }
5325
5326         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
5327         if (rc != 0)
5328                 GOTO(out, rc);
5329
5330         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
5331                           XATTR_NAME_FID, BYPASS_CAPA);
5332         if (rc == -ENODATA) {
5333                 /* For the pre-created OST-object, update the bitmap to avoid
5334                  * others LFSCK (second phase) iteration to touch it again. */
5335                 if (la->la_ctime == 0) {
5336                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
5337                                 atomic_inc(&lrn->lrn_accessed_count);
5338
5339                         /* For the race between repairing dangling referenced
5340                          * MDT-object and unlink the file, it may left orphan
5341                          * OST-object there. Destroy it now! */
5342                         if (unlikely(!(la->la_mode & S_ISUID))) {
5343                                 dt_read_unlock(env, obj);
5344                                 lfsck_layout_destroy_orphan(env,
5345                                                             lfsck->li_bottom,
5346                                                             obj);
5347                                 lfsck_object_put(env, obj);
5348                                 pos++;
5349                                 goto again1;
5350                         }
5351                 } else if (idx == 0) {
5352                         /* If the orphan OST-object has no parent information,
5353                          * regard it as referenced by the MDT-object on MDT0. */
5354                         fid_zero(&rec->lor_fid);
5355                         rec->lor_uid = la->la_uid;
5356                         rec->lor_gid = la->la_gid;
5357                         GOTO(out, rc = 0);
5358                 }
5359
5360                 dt_read_unlock(env, obj);
5361                 lfsck_object_put(env, obj);
5362                 pos++;
5363                 goto again1;
5364         }
5365
5366         if (rc < 0)
5367                 GOTO(out, rc);
5368
5369         if (rc != sizeof(struct filter_fid) &&
5370             rc != sizeof(struct filter_fid_old))
5371                 GOTO(out, rc = -EINVAL);
5372
5373         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
5374         /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver,
5375          * instead, it is the OST-object index in its parent MDT-object
5376          * layout EA. */
5377         save = rec->lor_fid.f_ver;
5378         rec->lor_fid.f_ver = 0;
5379         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
5380         /* If the orphan OST-object does not claim the MDT, then next.
5381          *
5382          * If we do not know whether it matches or not, then return it
5383          * to the MDT for further check. */
5384         if (rc == 0) {
5385                 dt_read_unlock(env, obj);
5386                 lfsck_object_put(env, obj);
5387                 pos++;
5388                 goto again1;
5389         }
5390
5391         rec->lor_fid.f_ver = save;
5392         rec->lor_uid = la->la_uid;
5393         rec->lor_gid = la->la_gid;
5394
5395         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
5396                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
5397                rec->lor_uid, rec->lor_gid);
5398
5399         GOTO(out, rc = 0);
5400
5401 out:
5402         dt_read_unlock(env, obj);
5403         lfsck_object_put(env, obj);
5404         if (rc == 0)
5405                 it->loi_hash++;
5406
5407         return rc;
5408 }
5409
5410 /**
5411  * \retval       +1: locate to the exactly position
5412  * \retval        0: cannot locate to the exactly position,
5413  *                   call next() to move to a valid position.
5414  * \retval      -ve: on error
5415  */
5416 static int lfsck_orphan_it_get(const struct lu_env *env,
5417                                struct dt_it *di,
5418                                const struct dt_key *key)
5419 {
5420         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
5421         int                      rc;
5422
5423         it->loi_key = *(struct lu_fid *)key;
5424         rc = lfsck_orphan_it_next(env, di);
5425         if (rc == 1)
5426                 return 0;
5427
5428         if (rc == 0)
5429                 return 1;
5430
5431         return rc;
5432 }
5433
5434 static void lfsck_orphan_it_put(const struct lu_env *env,
5435                                 struct dt_it *di)
5436 {
5437 }
5438
5439 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
5440                                           const struct dt_it *di)
5441 {
5442         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5443
5444         return (struct dt_key *)&it->loi_key;
5445 }
5446
5447 static int lfsck_orphan_it_key_size(const struct lu_env *env,
5448                                     const struct dt_it *di)
5449 {
5450         return sizeof(struct lu_fid);
5451 }
5452
5453 static int lfsck_orphan_it_rec(const struct lu_env *env,
5454                                const struct dt_it *di,
5455                                struct dt_rec *rec,
5456                                __u32 attr)
5457 {
5458         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
5459
5460         *(struct lu_orphan_rec *)rec = it->loi_rec;
5461
5462         return 0;
5463 }
5464
5465 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
5466                                    const struct dt_it *di)
5467 {
5468         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
5469
5470         return it->loi_hash;
5471 }
5472
5473 /**
5474  * \retval       +1: locate to the exactly position
5475  * \retval        0: cannot locate to the exactly position,
5476  *                   call next() to move to a valid position.
5477  * \retval      -ve: on error
5478  */
5479 static int lfsck_orphan_it_load(const struct lu_env *env,
5480                                 const struct dt_it *di,
5481                                 __u64 hash)
5482 {
5483         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
5484         struct lfsck_layout_slave_target *llst = it->loi_llst;
5485         int                               rc;
5486
5487         LASSERT(llst != NULL);
5488
5489         if (hash != llst->llst_hash) {
5490                 CWARN("%s: the given hash "LPU64" for orphan iteration does "
5491                       "not match the one when fini "LPU64", to be reset.\n",
5492                       lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
5493                       llst->llst_hash);
5494                 fid_zero(&llst->llst_fid);
5495                 llst->llst_hash = 0;
5496         }
5497
5498         it->loi_key = llst->llst_fid;
5499         it->loi_hash = llst->llst_hash;
5500         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
5501         if (rc == 1)
5502                 return 0;
5503
5504         if (rc == 0)
5505                 return 1;
5506
5507         return rc;
5508 }
5509
5510 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
5511                                    const struct dt_it *di,
5512                                    void *key_rec)
5513 {
5514         return 0;
5515 }
5516
5517 const struct dt_index_operations lfsck_orphan_index_ops = {
5518         .dio_lookup             = lfsck_orphan_index_lookup,
5519         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
5520         .dio_insert             = lfsck_orphan_index_insert,
5521         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
5522         .dio_delete             = lfsck_orphan_index_delete,
5523         .dio_it = {
5524                 .init           = lfsck_orphan_it_init,
5525                 .fini           = lfsck_orphan_it_fini,
5526                 .get            = lfsck_orphan_it_get,
5527                 .put            = lfsck_orphan_it_put,
5528                 .next           = lfsck_orphan_it_next,
5529                 .key            = lfsck_orphan_it_key,
5530                 .key_size       = lfsck_orphan_it_key_size,
5531                 .rec            = lfsck_orphan_it_rec,
5532                 .store          = lfsck_orphan_it_store,
5533                 .load           = lfsck_orphan_it_load,
5534                 .key_rec        = lfsck_orphan_it_key_rec,
5535         }
5536 };