Whamcloud - gitweb
LU-3336 lfsck: orphan OST-objects iteration
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_linkea.h>
43 #include <lustre_fid.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <md_object.h>
48 #include <obd_class.h>
49
50 #include "lfsck_internal.h"
51
52 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
53
54 static const char lfsck_layout_name[] = "lfsck_layout";
55
56 struct lfsck_layout_seq {
57         struct list_head         lls_list;
58         __u64                    lls_seq;
59         __u64                    lls_lastid;
60         __u64                    lls_lastid_known;
61         struct dt_object        *lls_lastid_obj;
62         unsigned int             lls_dirty:1;
63 };
64
65 struct lfsck_layout_slave_target {
66         /* link into lfsck_layout_slave_data::llsd_master_list. */
67         struct list_head        llst_list;
68         /* The position for next record in the rbtree for iteration. */
69         struct lu_fid           llst_fid;
70         /* Dummy hash for iteration against the rbtree. */
71         __u64                   llst_hash;
72         __u64                   llst_gen;
73         atomic_t                llst_ref;
74         __u32                   llst_index;
75 };
76
77 struct lfsck_layout_slave_data {
78         /* list for lfsck_layout_seq */
79         struct list_head         llsd_seq_list;
80
81         /* list for the masters involve layout verification. */
82         struct list_head         llsd_master_list;
83         spinlock_t               llsd_lock;
84         __u64                    llsd_touch_gen;
85         struct dt_object        *llsd_rb_obj;
86         struct rb_root           llsd_rb_root;
87         rwlock_t                 llsd_rb_lock;
88         unsigned int             llsd_rbtree_valid:1;
89 };
90
91 struct lfsck_layout_object {
92         struct dt_object        *llo_obj;
93         struct lu_attr           llo_attr;
94         atomic_t                 llo_ref;
95         __u16                    llo_gen;
96 };
97
98 struct lfsck_layout_req {
99         struct list_head                 llr_list;
100         struct lfsck_layout_object      *llr_parent;
101         struct dt_object                *llr_child;
102         __u32                            llr_ost_idx;
103         __u32                            llr_lov_idx; /* offset in LOV EA */
104 };
105
106 struct lfsck_layout_master_data {
107         spinlock_t              llmd_lock;
108         struct list_head        llmd_req_list;
109
110         /* list for the ost targets involve layout verification. */
111         struct list_head        llmd_ost_list;
112
113         /* list for the ost targets in phase1 scanning. */
114         struct list_head        llmd_ost_phase1_list;
115
116         /* list for the ost targets in phase1 scanning. */
117         struct list_head        llmd_ost_phase2_list;
118
119         /* list for the mdt targets involve layout verification. */
120         struct list_head        llmd_mdt_list;
121
122         /* list for the mdt targets in phase1 scanning. */
123         struct list_head        llmd_mdt_phase1_list;
124
125         /* list for the mdt targets in phase1 scanning. */
126         struct list_head        llmd_mdt_phase2_list;
127
128         struct ptlrpc_thread    llmd_thread;
129         __u32                   llmd_touch_gen;
130         int                     llmd_prefetched;
131         int                     llmd_assistant_status;
132         int                     llmd_post_result;
133         unsigned int            llmd_to_post:1,
134                                 llmd_to_double_scan:1,
135                                 llmd_in_double_scan:1,
136                                 llmd_exit:1;
137 };
138
139 struct lfsck_layout_slave_async_args {
140         struct obd_export                *llsaa_exp;
141         struct lfsck_component           *llsaa_com;
142         struct lfsck_layout_slave_target *llsaa_llst;
143 };
144
145 static struct lfsck_layout_object *
146 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
147                          __u16 gen)
148 {
149         struct lfsck_layout_object *llo;
150         int                         rc;
151
152         OBD_ALLOC_PTR(llo);
153         if (llo == NULL)
154                 return ERR_PTR(-ENOMEM);
155
156         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
157         if (rc != 0) {
158                 OBD_FREE_PTR(llo);
159
160                 return ERR_PTR(rc);
161         }
162
163         lu_object_get(&obj->do_lu);
164         llo->llo_obj = obj;
165         /* The gen can be used to check whether some others have changed the
166          * file layout after LFSCK pre-fetching but before real verification. */
167         llo->llo_gen = gen;
168         atomic_set(&llo->llo_ref, 1);
169
170         return llo;
171 }
172
173 static inline void
174 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
175 {
176         if (atomic_dec_and_test(&llst->llst_ref)) {
177                 LASSERT(list_empty(&llst->llst_list));
178
179                 OBD_FREE_PTR(llst);
180         }
181 }
182
183 static inline int
184 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
185 {
186         struct lfsck_layout_slave_target *llst;
187         struct lfsck_layout_slave_target *tmp;
188         int                               rc   = 0;
189
190         OBD_ALLOC_PTR(llst);
191         if (llst == NULL)
192                 return -ENOMEM;
193
194         INIT_LIST_HEAD(&llst->llst_list);
195         llst->llst_gen = 0;
196         llst->llst_index = index;
197         atomic_set(&llst->llst_ref, 1);
198
199         spin_lock(&llsd->llsd_lock);
200         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
201                 if (tmp->llst_index == index) {
202                         rc = -EALREADY;
203                         break;
204                 }
205         }
206         if (rc == 0)
207                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
208         spin_unlock(&llsd->llsd_lock);
209
210         if (rc != 0)
211                 OBD_FREE_PTR(llst);
212
213         return rc;
214 }
215
216 static inline void
217 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
218                       struct lfsck_layout_slave_target *llst)
219 {
220         bool del = false;
221
222         spin_lock(&llsd->llsd_lock);
223         if (!list_empty(&llst->llst_list)) {
224                 list_del_init(&llst->llst_list);
225                 del = true;
226         }
227         spin_unlock(&llsd->llsd_lock);
228
229         if (del)
230                 lfsck_layout_llst_put(llst);
231 }
232
233 static inline struct lfsck_layout_slave_target *
234 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
235                                __u32 index, bool unlink)
236 {
237         struct lfsck_layout_slave_target *llst;
238
239         spin_lock(&llsd->llsd_lock);
240         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
241                 if (llst->llst_index == index) {
242                         if (unlink)
243                                 list_del_init(&llst->llst_list);
244                         else
245                                 atomic_inc(&llst->llst_ref);
246                         spin_unlock(&llsd->llsd_lock);
247
248                         return llst;
249                 }
250         }
251         spin_unlock(&llsd->llsd_lock);
252
253         return NULL;
254 }
255
256 static inline void lfsck_layout_object_put(const struct lu_env *env,
257                                            struct lfsck_layout_object *llo)
258 {
259         if (atomic_dec_and_test(&llo->llo_ref)) {
260                 lfsck_object_put(env, llo->llo_obj);
261                 OBD_FREE_PTR(llo);
262         }
263 }
264
265 static struct lfsck_layout_req *
266 lfsck_layout_req_init(struct lfsck_layout_object *parent,
267                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
268 {
269         struct lfsck_layout_req *llr;
270
271         OBD_ALLOC_PTR(llr);
272         if (llr == NULL)
273                 return ERR_PTR(-ENOMEM);
274
275         INIT_LIST_HEAD(&llr->llr_list);
276         atomic_inc(&parent->llo_ref);
277         llr->llr_parent = parent;
278         llr->llr_child = child;
279         llr->llr_ost_idx = ost_idx;
280         llr->llr_lov_idx = lov_idx;
281
282         return llr;
283 }
284
285 static inline void lfsck_layout_req_fini(const struct lu_env *env,
286                                          struct lfsck_layout_req *llr)
287 {
288         lu_object_put(env, &llr->llr_child->do_lu);
289         lfsck_layout_object_put(env, llr->llr_parent);
290         OBD_FREE_PTR(llr);
291 }
292
293 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
294 {
295         bool empty = false;
296
297         spin_lock(&llmd->llmd_lock);
298         if (list_empty(&llmd->llmd_req_list))
299                 empty = true;
300         spin_unlock(&llmd->llmd_lock);
301
302         return empty;
303 }
304
305 static int lfsck_layout_get_lovea(const struct lu_env *env,
306                                   struct dt_object *obj,
307                                   struct lu_buf *buf, ssize_t *buflen)
308 {
309         int rc;
310
311 again:
312         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
313         if (rc == -ERANGE) {
314                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
315                                   BYPASS_CAPA);
316                 if (rc <= 0)
317                         return rc;
318
319                 lu_buf_realloc(buf, rc);
320                 if (buflen != NULL)
321                         *buflen = buf->lb_len;
322
323                 if (buf->lb_buf == NULL)
324                         return -ENOMEM;
325
326                 goto again;
327         }
328
329         if (rc == -ENODATA)
330                 rc = 0;
331
332         if (rc <= 0)
333                 return rc;
334
335         if (unlikely(buf->lb_buf == NULL)) {
336                 lu_buf_alloc(buf, rc);
337                 if (buflen != NULL)
338                         *buflen = buf->lb_len;
339
340                 if (buf->lb_buf == NULL)
341                         return -ENOMEM;
342
343                 goto again;
344         }
345
346         return rc;
347 }
348
349 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
350 {
351         __u32 magic;
352         __u32 patten;
353
354         magic = le32_to_cpu(lmm->lmm_magic);
355         /* If magic crashed, keep it there. Sometime later, during OST-object
356          * orphan handling, if some OST-object(s) back-point to it, it can be
357          * verified and repaired. */
358         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
359                 return -EINVAL;
360
361         patten = le32_to_cpu(lmm->lmm_pattern);
362         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
363         if (patten != LOV_PATTERN_RAID0)
364                 return -EOPNOTSUPP;
365
366         return 0;
367 }
368
369 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
370 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
371 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_SIZE - 1)
372
373 struct lfsck_rbtree_node {
374         struct rb_node   lrn_node;
375         __u64            lrn_seq;
376         __u32            lrn_first_oid;
377         atomic_t         lrn_known_count;
378         atomic_t         lrn_accessed_count;
379         void            *lrn_known_bitmap;
380         void            *lrn_accessed_bitmap;
381 };
382
383 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
384                                    __u64 seq, __u32 oid)
385 {
386         if (seq < lrn->lrn_seq)
387                 return -1;
388
389         if (seq > lrn->lrn_seq)
390                 return 1;
391
392         if (oid < lrn->lrn_first_oid)
393                 return -1;
394
395         if (oid >= lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH)
396                 return 1;
397
398         return 0;
399 }
400
401 /* The caller should hold llsd->llsd_rb_lock. */
402 static struct lfsck_rbtree_node *
403 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
404                     const struct lu_fid *fid, bool *exact)
405 {
406         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
407         struct rb_node           *prev  = NULL;
408         struct lfsck_rbtree_node *lrn   = NULL;
409         int                       rc    = 0;
410
411         if (exact != NULL)
412                 *exact = true;
413
414         while (node != NULL) {
415                 prev = node;
416                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
417                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
418                 if (rc < 0)
419                         node = node->rb_left;
420                 else if (rc > 0)
421                         node = node->rb_right;
422                 else
423                         return lrn;
424         }
425
426         if (exact == NULL)
427                 return NULL;
428
429         /* If there is no exactly matched one, then to the next valid one. */
430         *exact = false;
431
432         /* The rbtree is empty. */
433         if (rc == 0)
434                 return NULL;
435
436         if (rc < 0)
437                 return lrn;
438
439         node = rb_next(prev);
440
441         /* The end of the rbtree. */
442         if (node == NULL)
443                 return NULL;
444
445         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
446
447         return lrn;
448 }
449
450 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
451                                                   const struct lu_fid *fid)
452 {
453         struct lfsck_rbtree_node *lrn;
454
455         OBD_ALLOC_PTR(lrn);
456         if (lrn == NULL)
457                 return ERR_PTR(-ENOMEM);
458
459         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
460         if (lrn->lrn_known_bitmap == NULL) {
461                 OBD_FREE_PTR(lrn);
462
463                 return ERR_PTR(-ENOMEM);
464         }
465
466         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
467         if (lrn->lrn_accessed_bitmap == NULL) {
468                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
469                 OBD_FREE_PTR(lrn);
470
471                 return ERR_PTR(-ENOMEM);
472         }
473
474         rb_init_node(&lrn->lrn_node);
475         lrn->lrn_seq = fid_seq(fid);
476         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
477         atomic_set(&lrn->lrn_known_count, 0);
478         atomic_set(&lrn->lrn_accessed_count, 0);
479
480         return lrn;
481 }
482
483 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
484 {
485         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
486         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
487         OBD_FREE_PTR(lrn);
488 }
489
490 /* The caller should hold lock. */
491 static struct lfsck_rbtree_node *
492 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
493                     struct lfsck_rbtree_node *lrn)
494 {
495         struct rb_node           **pos    = &(llsd->llsd_rb_root.rb_node);
496         struct rb_node            *parent = NULL;
497         struct lfsck_rbtree_node  *tmp;
498         int                        rc;
499
500         while (*pos) {
501                 parent = *pos;
502                 tmp = rb_entry(*pos, struct lfsck_rbtree_node, lrn_node);
503                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
504                 if (rc < 0)
505                         pos = &((*pos)->rb_left);
506                 else if (rc > 0)
507                         pos = &((*pos)->rb_right);
508                 else
509                         return tmp;
510         }
511
512         rb_link_node(&lrn->lrn_node, parent, pos);
513         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
514
515         return lrn;
516 }
517
518 extern const struct dt_index_operations lfsck_orphan_index_ops;
519
520 static int lfsck_rbtree_setup(const struct lu_env *env,
521                               struct lfsck_component *com)
522 {
523         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
524         struct lfsck_instance           *lfsck  = com->lc_lfsck;
525         struct dt_device                *dev    = lfsck->li_bottom;
526         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
527         struct dt_object                *obj;
528
529         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
530         fid->f_oid = lfsck_dev_idx(dev);
531         fid->f_ver = 0;
532         obj = dt_locate(env, dev, fid);
533         if (IS_ERR(obj))
534                 RETURN(PTR_ERR(obj));
535
536         /* Generate an in-RAM object to stand for the layout rbtree.
537          * Scanning the layout rbtree will be via the iteration over
538          * the object. In the future, the rbtree may be written onto
539          * disk with the object.
540          *
541          * Mark the object to be as exist. */
542         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
543         obj->do_index_ops = &lfsck_orphan_index_ops;
544         llsd->llsd_rb_obj = obj;
545         llsd->llsd_rbtree_valid = 1;
546         dev->dd_record_fid_accessed = 1;
547
548         return 0;
549 }
550
551 static void lfsck_rbtree_cleanup(const struct lu_env *env,
552                                  struct lfsck_component *com)
553 {
554         struct lfsck_instance           *lfsck = com->lc_lfsck;
555         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
556         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
557         struct rb_node                  *next;
558         struct lfsck_rbtree_node        *lrn;
559
560         lfsck->li_bottom->dd_record_fid_accessed = 0;
561         /* Invalid the rbtree, then no others will use it. */
562         write_lock(&llsd->llsd_rb_lock);
563         llsd->llsd_rbtree_valid = 0;
564         write_unlock(&llsd->llsd_rb_lock);
565
566         while (node != NULL) {
567                 next = rb_next(node);
568                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
569                 rb_erase(node, &llsd->llsd_rb_root);
570                 lfsck_rbtree_free(lrn);
571                 node = next;
572         }
573
574         if (llsd->llsd_rb_obj != NULL) {
575                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
576                 llsd->llsd_rb_obj = NULL;
577         }
578 }
579
580 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
581                                        struct lfsck_component *com,
582                                        const struct lu_fid *fid,
583                                        bool accessed)
584 {
585         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
586         struct lfsck_rbtree_node        *lrn;
587         bool                             insert = false;
588         int                              idx;
589         int                              rc     = 0;
590         ENTRY;
591
592         CDEBUG(D_LFSCK, "%s: update bitmap for "DFID"\n",
593                lfsck_lfsck2name(com->lc_lfsck), PFID(fid));
594
595         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
596                 RETURN_EXIT;
597
598         if (!fid_is_idif(fid) && !fid_is_norm(fid))
599                 RETURN_EXIT;
600
601         read_lock(&llsd->llsd_rb_lock);
602         if (!llsd->llsd_rbtree_valid)
603                 GOTO(unlock, rc = 0);
604
605         lrn = lfsck_rbtree_search(llsd, fid, NULL);
606         if (lrn == NULL) {
607                 struct lfsck_rbtree_node *tmp;
608
609                 LASSERT(!insert);
610
611                 read_unlock(&llsd->llsd_rb_lock);
612                 tmp = lfsck_rbtree_new(env, fid);
613                 if (IS_ERR(tmp))
614                         GOTO(out, rc = PTR_ERR(tmp));
615
616                 insert = true;
617                 write_lock(&llsd->llsd_rb_lock);
618                 if (!llsd->llsd_rbtree_valid) {
619                         lfsck_rbtree_free(tmp);
620                         GOTO(unlock, rc = 0);
621                 }
622
623                 lrn = lfsck_rbtree_insert(llsd, tmp);
624                 if (lrn != tmp)
625                         lfsck_rbtree_free(tmp);
626         }
627
628         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
629         /* Any accessed object must be a known object. */
630         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
631                 atomic_inc(&lrn->lrn_known_count);
632         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
633                 atomic_inc(&lrn->lrn_accessed_count);
634
635         GOTO(unlock, rc = 0);
636
637 unlock:
638         if (insert)
639                 write_unlock(&llsd->llsd_rb_lock);
640         else
641                 read_unlock(&llsd->llsd_rb_lock);
642 out:
643         if (rc != 0 && accessed) {
644                 struct lfsck_layout *lo = com->lc_file_ram;
645
646                 CERROR("%s: Fail to update object accessed bitmap, will cause "
647                        "incorrect LFSCK OST-object handling, so disable it to "
648                        "cancel orphan handling for related device. rc = %d.\n",
649                        lfsck_lfsck2name(com->lc_lfsck), rc);
650                 lo->ll_flags |= LF_INCOMPLETE;
651                 lfsck_rbtree_cleanup(env, com);
652         }
653 }
654
655 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
656                                    const struct lfsck_layout *src)
657 {
658         int i;
659
660         des->ll_magic = le32_to_cpu(src->ll_magic);
661         des->ll_status = le32_to_cpu(src->ll_status);
662         des->ll_flags = le32_to_cpu(src->ll_flags);
663         des->ll_success_count = le32_to_cpu(src->ll_success_count);
664         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
665         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
666         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
667         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
668         des->ll_time_last_checkpoint =
669                                 le64_to_cpu(src->ll_time_last_checkpoint);
670         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
671         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
672         des->ll_pos_first_inconsistent =
673                         le64_to_cpu(src->ll_pos_first_inconsistent);
674         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
675         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
676         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
677         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
678         for (i = 0; i < LLIT_MAX; i++)
679                 des->ll_objs_repaired[i] =
680                                 le64_to_cpu(src->ll_objs_repaired[i]);
681         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
682 }
683
684 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
685                                    const struct lfsck_layout *src)
686 {
687         int i;
688
689         des->ll_magic = cpu_to_le32(src->ll_magic);
690         des->ll_status = cpu_to_le32(src->ll_status);
691         des->ll_flags = cpu_to_le32(src->ll_flags);
692         des->ll_success_count = cpu_to_le32(src->ll_success_count);
693         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
694         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
695         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
696         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
697         des->ll_time_last_checkpoint =
698                                 cpu_to_le64(src->ll_time_last_checkpoint);
699         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
700         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
701         des->ll_pos_first_inconsistent =
702                         cpu_to_le64(src->ll_pos_first_inconsistent);
703         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
704         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
705         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
706         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
707         for (i = 0; i < LLIT_MAX; i++)
708                 des->ll_objs_repaired[i] =
709                                 cpu_to_le64(src->ll_objs_repaired[i]);
710         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
711 }
712
713 /**
714  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
715  * \retval 0: succeed.
716  * \retval -ve: failed cases.
717  */
718 static int lfsck_layout_load(const struct lu_env *env,
719                              struct lfsck_component *com)
720 {
721         struct lfsck_layout             *lo     = com->lc_file_ram;
722         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
723         ssize_t                          size   = com->lc_file_size;
724         loff_t                           pos    = 0;
725         int                              rc;
726
727         rc = dbo->dbo_read(env, com->lc_obj,
728                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
729                            BYPASS_CAPA);
730         if (rc == 0) {
731                 return -ENOENT;
732         } else if (rc < 0) {
733                 CWARN("%s: failed to load lfsck_layout: rc = %d\n",
734                       lfsck_lfsck2name(com->lc_lfsck), rc);
735                 return rc;
736         } else if (rc != size) {
737                 CWARN("%s: crashed lfsck_layout, to be reset: rc = %d\n",
738                       lfsck_lfsck2name(com->lc_lfsck), rc);
739                 return 1;
740         }
741
742         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
743         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
744                 CWARN("%s: invalid lfsck_layout magic %#x != %#x, "
745                       "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
746                       lo->ll_magic, LFSCK_LAYOUT_MAGIC);
747                 return 1;
748         }
749
750         return 0;
751 }
752
753 static int lfsck_layout_store(const struct lu_env *env,
754                               struct lfsck_component *com)
755 {
756         struct dt_object         *obj           = com->lc_obj;
757         struct lfsck_instance    *lfsck         = com->lc_lfsck;
758         struct lfsck_layout      *lo            = com->lc_file_disk;
759         struct thandle           *handle;
760         ssize_t                   size          = com->lc_file_size;
761         loff_t                    pos           = 0;
762         int                       rc;
763         ENTRY;
764
765         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
766         handle = dt_trans_create(env, lfsck->li_bottom);
767         if (IS_ERR(handle)) {
768                 rc = PTR_ERR(handle);
769                 CERROR("%s: fail to create trans for storing lfsck_layout: "
770                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
771                 RETURN(rc);
772         }
773
774         rc = dt_declare_record_write(env, obj, size, pos, handle);
775         if (rc != 0) {
776                 CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
777                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
778                 GOTO(out, rc);
779         }
780
781         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
782         if (rc != 0) {
783                 CERROR("%s: fail to start trans for storing lfsck_layout: "
784                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
785                 GOTO(out, rc);
786         }
787
788         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
789                              handle);
790         if (rc != 0)
791                 CERROR("%s: fail to store lfsck_layout(1): size = %d, "
792                        "rc = %d\n", lfsck_lfsck2name(lfsck), (int)size, rc);
793
794         GOTO(out, rc);
795
796 out:
797         dt_trans_stop(env, lfsck->li_bottom, handle);
798
799         return rc;
800 }
801
802 static int lfsck_layout_init(const struct lu_env *env,
803                              struct lfsck_component *com)
804 {
805         struct lfsck_layout *lo = com->lc_file_ram;
806         int rc;
807
808         memset(lo, 0, com->lc_file_size);
809         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
810         lo->ll_status = LS_INIT;
811         down_write(&com->lc_sem);
812         rc = lfsck_layout_store(env, com);
813         up_write(&com->lc_sem);
814
815         return rc;
816 }
817
818 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
819                              struct dt_object *obj, const struct lu_fid *fid)
820 {
821         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
822         struct lu_seq_range      range  = { 0 };
823         struct lustre_mdt_attrs *lma;
824         int                      rc;
825
826         fld_range_set_any(&range);
827         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
828         if (rc == 0) {
829                 if (fld_range_is_ost(&range))
830                         return 1;
831
832                 return 0;
833         }
834
835         lma = &lfsck_env_info(env)->lti_lma;
836         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
837                           XATTR_NAME_LMA, BYPASS_CAPA);
838         if (rc == sizeof(*lma)) {
839                 lustre_lma_swab(lma);
840
841                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
842         }
843
844         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
845
846         return rc > 0;
847 }
848
849 static struct lfsck_layout_seq *
850 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
851 {
852         struct lfsck_layout_seq *lls;
853
854         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
855                 if (lls->lls_seq == seq)
856                         return lls;
857
858                 if (lls->lls_seq > seq)
859                         return NULL;
860         }
861
862         return NULL;
863 }
864
865 static void
866 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
867                         struct lfsck_layout_seq *lls)
868 {
869         struct lfsck_layout_seq *tmp;
870         struct list_head        *pos = &llsd->llsd_seq_list;
871
872         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
873                 if (lls->lls_seq < tmp->lls_seq) {
874                         pos = &tmp->lls_list;
875                         break;
876                 }
877         }
878         list_add_tail(&lls->lls_list, pos);
879 }
880
881 static int
882 lfsck_layout_lastid_create(const struct lu_env *env,
883                            struct lfsck_instance *lfsck,
884                            struct dt_object *obj)
885 {
886         struct lfsck_thread_info *info   = lfsck_env_info(env);
887         struct lu_attr           *la     = &info->lti_la;
888         struct dt_object_format  *dof    = &info->lti_dof;
889         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
890         struct dt_device         *dt     = lfsck->li_bottom;
891         struct thandle           *th;
892         __u64                     lastid = 0;
893         loff_t                    pos    = 0;
894         int                       rc;
895         ENTRY;
896
897         CDEBUG(D_LFSCK, "To create LAST_ID for <seq> "LPX64"\n",
898                fid_seq(lfsck_dto2fid(obj)));
899
900         if (bk->lb_param & LPF_DRYRUN)
901                 return 0;
902
903         memset(la, 0, sizeof(*la));
904         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
905         la->la_valid = LA_MODE | LA_UID | LA_GID;
906         dof->dof_type = dt_mode_to_dft(S_IFREG);
907
908         th = dt_trans_create(env, dt);
909         if (IS_ERR(th))
910                 RETURN(rc = PTR_ERR(th));
911
912         rc = dt_declare_create(env, obj, la, NULL, dof, th);
913         if (rc != 0)
914                 GOTO(stop, rc);
915
916         rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
917         if (rc != 0)
918                 GOTO(stop, rc);
919
920         rc = dt_trans_start_local(env, dt, th);
921         if (rc != 0)
922                 GOTO(stop, rc);
923
924         dt_write_lock(env, obj, 0);
925         if (likely(!dt_object_exists(obj))) {
926                 rc = dt_create(env, obj, la, NULL, dof, th);
927                 if (rc == 0)
928                         rc = dt_record_write(env, obj,
929                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
930                                 &pos, th);
931         }
932         dt_write_unlock(env, obj);
933
934         GOTO(stop, rc);
935
936 stop:
937         dt_trans_stop(env, dt, th);
938
939         return rc;
940 }
941
942 static int
943 lfsck_layout_lastid_reload(const struct lu_env *env,
944                            struct lfsck_component *com,
945                            struct lfsck_layout_seq *lls)
946 {
947         __u64   lastid;
948         loff_t  pos     = 0;
949         int     rc;
950
951         dt_read_lock(env, lls->lls_lastid_obj, 0);
952         rc = dt_record_read(env, lls->lls_lastid_obj,
953                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
954         dt_read_unlock(env, lls->lls_lastid_obj);
955         if (unlikely(rc != 0))
956                 return rc;
957
958         lastid = le64_to_cpu(lastid);
959         if (lastid < lls->lls_lastid_known) {
960                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
961                 struct lfsck_layout     *lo     = com->lc_file_ram;
962
963                 lls->lls_lastid = lls->lls_lastid_known;
964                 lls->lls_dirty = 1;
965                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
966                         LASSERT(lfsck->li_out_notify != NULL);
967
968                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
969                                              LE_LASTID_REBUILDING);
970                         lo->ll_flags |= LF_CRASHED_LASTID;
971                 }
972         } else if (lastid >= lls->lls_lastid) {
973                 lls->lls_lastid = lastid;
974                 lls->lls_dirty = 0;
975         }
976
977         return 0;
978 }
979
980 static int
981 lfsck_layout_lastid_store(const struct lu_env *env,
982                           struct lfsck_component *com)
983 {
984         struct lfsck_instance           *lfsck  = com->lc_lfsck;
985         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
986         struct dt_device                *dt     = lfsck->li_bottom;
987         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
988         struct lfsck_layout_seq         *lls;
989         struct thandle                  *th;
990         __u64                            lastid;
991         int                              rc     = 0;
992         int                              rc1    = 0;
993
994         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
995                 loff_t pos = 0;
996
997                 /* XXX: Add the code back if we really found related
998                  *      inconsistent cases in the future. */
999 #if 0
1000                 if (!lls->lls_dirty) {
1001                         /* In OFD, before the pre-creation, the LAST_ID
1002                          * file will be updated firstly, which may hide
1003                          * some potential crashed cases. For example:
1004                          *
1005                          * The old obj1's ID is higher than old LAST_ID
1006                          * but lower than the new LAST_ID, but the LFSCK
1007                          * have not touch the obj1 until the OFD updated
1008                          * the LAST_ID. So the LFSCK does not regard it
1009                          * as crashed case. But when OFD does not create
1010                          * successfully, it will set the LAST_ID as the
1011                          * real created objects' ID, then LFSCK needs to
1012                          * found related inconsistency. */
1013                         rc = lfsck_layout_lastid_reload(env, com, lls);
1014                         if (likely(!lls->lls_dirty))
1015                                 continue;
1016                 }
1017 #endif
1018
1019                 CDEBUG(D_LFSCK, "To sync the LAST_ID for <seq> "LPX64
1020                        " as <oid> "LPU64"\n", lls->lls_seq, lls->lls_lastid);
1021
1022                 if (bk->lb_param & LPF_DRYRUN) {
1023                         lls->lls_dirty = 0;
1024                         continue;
1025                 }
1026
1027                 th = dt_trans_create(env, dt);
1028                 if (IS_ERR(th)) {
1029                         rc1 = PTR_ERR(th);
1030                         CERROR("%s: (1) failed to store "LPX64": rc = %d\n",
1031                                lfsck_lfsck2name(com->lc_lfsck),
1032                                lls->lls_seq, rc1);
1033                         continue;
1034                 }
1035
1036                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1037                                              sizeof(lastid), pos, th);
1038                 if (rc != 0)
1039                         goto stop;
1040
1041                 rc = dt_trans_start_local(env, dt, th);
1042                 if (rc != 0)
1043                         goto stop;
1044
1045                 lastid = cpu_to_le64(lls->lls_lastid);
1046                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1047                 rc = dt_record_write(env, lls->lls_lastid_obj,
1048                                      lfsck_buf_get(env, &lastid,
1049                                      sizeof(lastid)), &pos, th);
1050                 dt_write_unlock(env, lls->lls_lastid_obj);
1051                 if (rc == 0)
1052                         lls->lls_dirty = 0;
1053
1054 stop:
1055                 dt_trans_stop(env, dt, th);
1056                 if (rc != 0) {
1057                         rc1 = rc;
1058                         CERROR("%s: (2) failed to store "LPX64": rc = %d\n",
1059                                lfsck_lfsck2name(com->lc_lfsck),
1060                                lls->lls_seq, rc1);
1061                 }
1062         }
1063
1064         return rc1;
1065 }
1066
1067 static int
1068 lfsck_layout_lastid_load(const struct lu_env *env,
1069                          struct lfsck_component *com,
1070                          struct lfsck_layout_seq *lls)
1071 {
1072         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1073         struct lfsck_layout     *lo     = com->lc_file_ram;
1074         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1075         struct dt_object        *obj;
1076         loff_t                   pos    = 0;
1077         int                      rc;
1078         ENTRY;
1079
1080         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1081         obj = dt_locate(env, lfsck->li_bottom, fid);
1082         if (IS_ERR(obj))
1083                 RETURN(PTR_ERR(obj));
1084
1085         /* LAST_ID crashed, to be rebuilt */
1086         if (!dt_object_exists(obj)) {
1087                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1088                         LASSERT(lfsck->li_out_notify != NULL);
1089
1090                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1091                                              LE_LASTID_REBUILDING);
1092                         lo->ll_flags |= LF_CRASHED_LASTID;
1093
1094                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1095                             cfs_fail_val > 0) {
1096                                 struct l_wait_info lwi = LWI_TIMEOUT(
1097                                                 cfs_time_seconds(cfs_fail_val),
1098                                                 NULL, NULL);
1099
1100                                 up_write(&com->lc_sem);
1101                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1102                                              !thread_is_running(&lfsck->li_thread),
1103                                              &lwi);
1104                                 down_write(&com->lc_sem);
1105                         }
1106                 }
1107
1108                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1109         } else {
1110                 dt_read_lock(env, obj, 0);
1111                 rc = dt_read(env, obj,
1112                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1113                         &pos);
1114                 dt_read_unlock(env, obj);
1115                 if (rc != 0 && rc != sizeof(__u64))
1116                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1117
1118                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1119                         LASSERT(lfsck->li_out_notify != NULL);
1120
1121                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1122                                              LE_LASTID_REBUILDING);
1123                         lo->ll_flags |= LF_CRASHED_LASTID;
1124                 }
1125
1126                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1127                 rc = 0;
1128         }
1129
1130         GOTO(out, rc);
1131
1132 out:
1133         if (rc != 0)
1134                 lfsck_object_put(env, obj);
1135         else
1136                 lls->lls_lastid_obj = obj;
1137
1138         return rc;
1139 }
1140
1141 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1142                                                struct ptlrpc_request *req,
1143                                                void *args, int rc)
1144 {
1145         struct lfsck_async_interpret_args *laia = args;
1146         struct lfsck_component            *com  = laia->laia_com;
1147         struct lfsck_layout_master_data   *llmd = com->lc_data;
1148         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1149         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1150         struct lfsck_request              *lr   = laia->laia_lr;
1151
1152         switch (lr->lr_event) {
1153         case LE_START:
1154                 if (rc != 0) {
1155                         struct lfsck_layout *lo = com->lc_file_ram;
1156
1157                         CERROR("%s: fail to notify %s %x for layout start: "
1158                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1159                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1160                                ltd->ltd_index, rc);
1161                         lo->ll_flags |= LF_INCOMPLETE;
1162                         break;
1163                 }
1164
1165                 spin_lock(&ltds->ltd_lock);
1166                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1167                         spin_unlock(&ltds->ltd_lock);
1168                         break;
1169                 }
1170
1171                 if (lr->lr_flags & LEF_TO_OST) {
1172                         if (list_empty(&ltd->ltd_layout_list))
1173                                 list_add_tail(&ltd->ltd_layout_list,
1174                                               &llmd->llmd_ost_list);
1175                         if (list_empty(&ltd->ltd_layout_phase_list))
1176                                 list_add_tail(&ltd->ltd_layout_phase_list,
1177                                               &llmd->llmd_ost_phase1_list);
1178                 } else {
1179                         if (list_empty(&ltd->ltd_layout_list))
1180                                 list_add_tail(&ltd->ltd_layout_list,
1181                                               &llmd->llmd_mdt_list);
1182                         if (list_empty(&ltd->ltd_layout_phase_list))
1183                                 list_add_tail(&ltd->ltd_layout_phase_list,
1184                                               &llmd->llmd_mdt_phase1_list);
1185                 }
1186                 spin_unlock(&ltds->ltd_lock);
1187                 break;
1188         case LE_STOP:
1189         case LE_PHASE1_DONE:
1190         case LE_PHASE2_DONE:
1191         case LE_PEER_EXIT:
1192                 if (rc != 0 && rc != -EALREADY)
1193                         CWARN("%s: fail to notify %s %x for layout: "
1194                               "event = %d, rc = %d\n",
1195                               lfsck_lfsck2name(com->lc_lfsck),
1196                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1197                               ltd->ltd_index, lr->lr_event, rc);
1198                 break;
1199         case LE_QUERY: {
1200                 struct lfsck_reply *reply;
1201
1202                 if (rc != 0) {
1203                         spin_lock(&ltds->ltd_lock);
1204                         list_del_init(&ltd->ltd_layout_phase_list);
1205                         list_del_init(&ltd->ltd_layout_list);
1206                         spin_unlock(&ltds->ltd_lock);
1207                         break;
1208                 }
1209
1210                 reply = req_capsule_server_get(&req->rq_pill,
1211                                                &RMF_LFSCK_REPLY);
1212                 if (reply == NULL) {
1213                         rc = -EPROTO;
1214                         CERROR("%s: invalid return value: rc = %d\n",
1215                                lfsck_lfsck2name(com->lc_lfsck), rc);
1216                         spin_lock(&ltds->ltd_lock);
1217                         list_del_init(&ltd->ltd_layout_phase_list);
1218                         list_del_init(&ltd->ltd_layout_list);
1219                         spin_unlock(&ltds->ltd_lock);
1220                         break;
1221                 }
1222
1223                 switch (reply->lr_status) {
1224                 case LS_SCANNING_PHASE1:
1225                         break;
1226                 case LS_SCANNING_PHASE2:
1227                         spin_lock(&ltds->ltd_lock);
1228                         list_del_init(&ltd->ltd_layout_phase_list);
1229                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1230                                 spin_unlock(&ltds->ltd_lock);
1231                                 break;
1232                         }
1233
1234                         if (lr->lr_flags & LEF_TO_OST)
1235                                 list_add_tail(&ltd->ltd_layout_phase_list,
1236                                               &llmd->llmd_ost_phase2_list);
1237                         else
1238                                 list_add_tail(&ltd->ltd_layout_phase_list,
1239                                               &llmd->llmd_mdt_phase2_list);
1240                         spin_unlock(&ltds->ltd_lock);
1241                         break;
1242                 default:
1243                         spin_lock(&ltds->ltd_lock);
1244                         list_del_init(&ltd->ltd_layout_phase_list);
1245                         list_del_init(&ltd->ltd_layout_list);
1246                         spin_unlock(&ltds->ltd_lock);
1247                         break;
1248                 }
1249                 break;
1250         }
1251         default:
1252                 CERROR("%s: unexpected event: rc = %d\n",
1253                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1254                 break;
1255         }
1256
1257         if (!laia->laia_shared) {
1258                 lfsck_tgt_put(ltd);
1259                 lfsck_component_put(env, com);
1260         }
1261
1262         return 0;
1263 }
1264
1265 static int lfsck_layout_master_query_others(const struct lu_env *env,
1266                                             struct lfsck_component *com)
1267 {
1268         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1269         struct lfsck_request              *lr    = &info->lti_lr;
1270         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1271         struct lfsck_instance             *lfsck = com->lc_lfsck;
1272         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1273         struct ptlrpc_request_set         *set;
1274         struct lfsck_tgt_descs            *ltds;
1275         struct lfsck_tgt_desc             *ltd;
1276         struct list_head                  *head;
1277         int                                rc    = 0;
1278         int                                rc1   = 0;
1279         ENTRY;
1280
1281         set = ptlrpc_prep_set();
1282         if (set == NULL)
1283                 RETURN(-ENOMEM);
1284
1285         llmd->llmd_touch_gen++;
1286         memset(lr, 0, sizeof(*lr));
1287         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1288         lr->lr_event = LE_QUERY;
1289         lr->lr_active = LT_LAYOUT;
1290         laia->laia_com = com;
1291         laia->laia_lr = lr;
1292         laia->laia_shared = 0;
1293
1294         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1295                 ltds = &lfsck->li_mdt_descs;
1296                 lr->lr_flags = 0;
1297                 head = &llmd->llmd_mdt_phase1_list;
1298         } else {
1299
1300 again:
1301                 ltds = &lfsck->li_ost_descs;
1302                 lr->lr_flags = LEF_TO_OST;
1303                 head = &llmd->llmd_ost_phase1_list;
1304         }
1305
1306         laia->laia_ltds = ltds;
1307         spin_lock(&ltds->ltd_lock);
1308         while (!list_empty(head)) {
1309                 ltd = list_entry(head->next,
1310                                  struct lfsck_tgt_desc,
1311                                  ltd_layout_phase_list);
1312                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1313                         break;
1314
1315                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1316                 list_del(&ltd->ltd_layout_phase_list);
1317                 list_add_tail(&ltd->ltd_layout_phase_list, head);
1318                 atomic_inc(&ltd->ltd_ref);
1319                 laia->laia_ltd = ltd;
1320                 spin_unlock(&ltds->ltd_lock);
1321                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1322                                          lfsck_layout_master_async_interpret,
1323                                          laia, LFSCK_QUERY);
1324                 if (rc != 0) {
1325                         CERROR("%s: fail to query %s %x for layout: rc = %d\n",
1326                                lfsck_lfsck2name(lfsck),
1327                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1328                                ltd->ltd_index, rc);
1329                         lfsck_tgt_put(ltd);
1330                         rc1 = rc;
1331                 }
1332                 spin_lock(&ltds->ltd_lock);
1333         }
1334         spin_unlock(&ltds->ltd_lock);
1335
1336         rc = ptlrpc_set_wait(set);
1337         if (rc < 0) {
1338                 ptlrpc_set_destroy(set);
1339                 RETURN(rc);
1340         }
1341
1342         if (!(lr->lr_flags & LEF_TO_OST) &&
1343             list_empty(&llmd->llmd_mdt_phase1_list))
1344                 goto again;
1345
1346         ptlrpc_set_destroy(set);
1347
1348         RETURN(rc1 != 0 ? rc1 : rc);
1349 }
1350
1351 static inline bool
1352 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1353 {
1354         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1355                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1356                 list_empty(&llmd->llmd_ost_phase1_list));
1357 }
1358
1359 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1360                                              struct lfsck_component *com,
1361                                              struct lfsck_request *lr)
1362 {
1363         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1364         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1365         struct lfsck_instance             *lfsck = com->lc_lfsck;
1366         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1367         struct lfsck_layout               *lo    = com->lc_file_ram;
1368         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1369         struct ptlrpc_request_set         *set;
1370         struct lfsck_tgt_descs            *ltds;
1371         struct lfsck_tgt_desc             *ltd;
1372         struct lfsck_tgt_desc             *next;
1373         struct list_head                  *head;
1374         __u32                              idx;
1375         int                                rc    = 0;
1376         ENTRY;
1377
1378         set = ptlrpc_prep_set();
1379         if (set == NULL)
1380                 RETURN(-ENOMEM);
1381
1382         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1383         lr->lr_active = LT_LAYOUT;
1384         laia->laia_com = com;
1385         laia->laia_lr = lr;
1386         laia->laia_shared = 0;
1387         switch (lr->lr_event) {
1388         case LE_START:
1389                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1390                 ltds = &lfsck->li_ost_descs;
1391                 laia->laia_ltds = ltds;
1392                 down_read(&ltds->ltd_rw_sem);
1393                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1394                         ltd = lfsck_tgt_get(ltds, idx);
1395                         LASSERT(ltd != NULL);
1396
1397                         laia->laia_ltd = ltd;
1398                         ltd->ltd_layout_done = 0;
1399                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1400                                         lfsck_layout_master_async_interpret,
1401                                         laia, LFSCK_NOTIFY);
1402                         if (rc != 0) {
1403                                 CERROR("%s: fail to notify %s %x for layout "
1404                                        "start: rc = %d\n",
1405                                        lfsck_lfsck2name(lfsck),
1406                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1407                                        "MDT", idx, rc);
1408                                 lfsck_tgt_put(ltd);
1409                                 lo->ll_flags |= LF_INCOMPLETE;
1410                         }
1411                 }
1412                 up_read(&ltds->ltd_rw_sem);
1413
1414                 /* Sync up */
1415                 rc = ptlrpc_set_wait(set);
1416                 if (rc < 0) {
1417                         ptlrpc_set_destroy(set);
1418                         RETURN(rc);
1419                 }
1420
1421                 if (!(bk->lb_param & LPF_ALL_TGT))
1422                         break;
1423
1424                 /* link other MDT targets locallly. */
1425                 spin_lock(&ltds->ltd_lock);
1426                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1427                         ltd = LTD_TGT(ltds, idx);
1428                         LASSERT(ltd != NULL);
1429
1430                         if (!list_empty(&ltd->ltd_layout_list))
1431                                 continue;
1432
1433                         list_add_tail(&ltd->ltd_layout_list,
1434                                       &llmd->llmd_mdt_list);
1435                         list_add_tail(&ltd->ltd_layout_phase_list,
1436                                       &llmd->llmd_mdt_phase1_list);
1437                 }
1438                 spin_unlock(&ltds->ltd_lock);
1439                 break;
1440         case LE_STOP:
1441         case LE_PHASE2_DONE:
1442         case LE_PEER_EXIT: {
1443                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1444                 if (bk->lb_param & LPF_ALL_TGT) {
1445                         head = &llmd->llmd_mdt_list;
1446                         ltds = &lfsck->li_mdt_descs;
1447                         if (lr->lr_event == LE_STOP) {
1448                                 /* unlink other MDT targets locallly. */
1449                                 spin_lock(&ltds->ltd_lock);
1450                                 list_for_each_entry_safe(ltd, next, head,
1451                                                          ltd_layout_list) {
1452                                         list_del_init(&ltd->ltd_layout_phase_list);
1453                                         list_del_init(&ltd->ltd_layout_list);
1454                                 }
1455                                 spin_unlock(&ltds->ltd_lock);
1456
1457                                 lr->lr_flags |= LEF_TO_OST;
1458                                 head = &llmd->llmd_ost_list;
1459                                 ltds = &lfsck->li_ost_descs;
1460                         } else {
1461                                 lr->lr_flags &= ~LEF_TO_OST;
1462                         }
1463                 } else {
1464                         lr->lr_flags |= LEF_TO_OST;
1465                         head = &llmd->llmd_ost_list;
1466                         ltds = &lfsck->li_ost_descs;
1467                 }
1468
1469 again:
1470                 laia->laia_ltds = ltds;
1471                 spin_lock(&ltds->ltd_lock);
1472                 while (!list_empty(head)) {
1473                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1474                                          ltd_layout_list);
1475                         if (!list_empty(&ltd->ltd_layout_phase_list))
1476                                 list_del_init(&ltd->ltd_layout_phase_list);
1477                         list_del_init(&ltd->ltd_layout_list);
1478                         atomic_inc(&ltd->ltd_ref);
1479                         laia->laia_ltd = ltd;
1480                         spin_unlock(&ltds->ltd_lock);
1481                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1482                                         lfsck_layout_master_async_interpret,
1483                                         laia, LFSCK_NOTIFY);
1484                         if (rc != 0) {
1485                                 CERROR("%s: fail to notify %s %x for layout "
1486                                        "stop/phase2: rc = %d\n",
1487                                        lfsck_lfsck2name(lfsck),
1488                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1489                                        "MDT", ltd->ltd_index, rc);
1490                                 lfsck_tgt_put(ltd);
1491                         }
1492                         spin_lock(&ltds->ltd_lock);
1493                 }
1494                 spin_unlock(&ltds->ltd_lock);
1495
1496                 rc = ptlrpc_set_wait(set);
1497                 if (rc < 0) {
1498                         ptlrpc_set_destroy(set);
1499                         RETURN(rc);
1500                 }
1501
1502                 if (!(lr->lr_flags & LEF_TO_OST)) {
1503                         lr->lr_flags |= LEF_TO_OST;
1504                         head = &llmd->llmd_ost_list;
1505                         ltds = &lfsck->li_ost_descs;
1506                         goto again;
1507                 }
1508                 break;
1509         }
1510         case LE_PHASE1_DONE:
1511                 llmd->llmd_touch_gen++;
1512                 ltds = &lfsck->li_mdt_descs;
1513                 laia->laia_ltds = ltds;
1514                 spin_lock(&ltds->ltd_lock);
1515                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1516                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1517                                          struct lfsck_tgt_desc,
1518                                          ltd_layout_phase_list);
1519                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1520                                 break;
1521
1522                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1523                         list_del_init(&ltd->ltd_layout_phase_list);
1524                         list_add_tail(&ltd->ltd_layout_phase_list,
1525                                       &llmd->llmd_mdt_phase1_list);
1526                         atomic_inc(&ltd->ltd_ref);
1527                         laia->laia_ltd = ltd;
1528                         spin_unlock(&ltds->ltd_lock);
1529                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1530                                         lfsck_layout_master_async_interpret,
1531                                         laia, LFSCK_NOTIFY);
1532                         if (rc != 0) {
1533                                 CERROR("%s: fail to notify MDT %x for layout "
1534                                        "phase1 done: rc = %d\n",
1535                                        lfsck_lfsck2name(lfsck),
1536                                        ltd->ltd_index, rc);
1537                                 lfsck_tgt_put(ltd);
1538                         }
1539                         spin_lock(&ltds->ltd_lock);
1540                 }
1541                 spin_unlock(&ltds->ltd_lock);
1542                 break;
1543         default:
1544                 CERROR("%s: unexpected LFSCK event: rc = %d\n",
1545                        lfsck_lfsck2name(lfsck), lr->lr_event);
1546                 rc = -EINVAL;
1547                 break;
1548         }
1549
1550         rc = ptlrpc_set_wait(set);
1551         ptlrpc_set_destroy(set);
1552
1553         RETURN(rc);
1554 }
1555
1556 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1557                                            struct lfsck_component *com,
1558                                            int rc)
1559 {
1560         struct lfsck_instance   *lfsck = com->lc_lfsck;
1561         struct lfsck_layout     *lo    = com->lc_file_ram;
1562         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1563
1564         down_write(&com->lc_sem);
1565
1566         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1567                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1568         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1569         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1570
1571         if (rc > 0) {
1572                 com->lc_journal = 0;
1573                 if (lo->ll_flags & LF_INCOMPLETE)
1574                         lo->ll_status = LS_PARTIAL;
1575                 else
1576                         lo->ll_status = LS_COMPLETED;
1577                 if (!(bk->lb_param & LPF_DRYRUN))
1578                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1579                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1580                 lo->ll_success_count++;
1581         } else if (rc == 0) {
1582                 lo->ll_status = lfsck->li_status;
1583                 if (lo->ll_status == 0)
1584                         lo->ll_status = LS_STOPPED;
1585         } else {
1586                 lo->ll_status = LS_FAILED;
1587         }
1588
1589         if (lo->ll_status != LS_PAUSED) {
1590                 spin_lock(&lfsck->li_lock);
1591                 list_del_init(&com->lc_link);
1592                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1593                 spin_unlock(&lfsck->li_lock);
1594         }
1595
1596         rc = lfsck_layout_store(env, com);
1597
1598         up_write(&com->lc_sem);
1599
1600         return rc;
1601 }
1602
1603 static int lfsck_layout_lock(const struct lu_env *env,
1604                              struct lfsck_component *com,
1605                              struct dt_object *obj,
1606                              struct lustre_handle *lh, __u64 bits)
1607 {
1608         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1609         ldlm_policy_data_t              *policy = &info->lti_policy;
1610         struct ldlm_res_id              *resid  = &info->lti_resid;
1611         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1612         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1613         int                              rc;
1614
1615         LASSERT(lfsck->li_namespace != NULL);
1616
1617         memset(policy, 0, sizeof(*policy));
1618         policy->l_inodebits.bits = bits;
1619         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1620         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1621                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1622                                     ldlm_completion_ast, NULL, NULL, 0,
1623                                     LVB_T_NONE, NULL, lh);
1624         if (rc == ELDLM_OK) {
1625                 rc = 0;
1626         } else {
1627                 memset(lh, 0, sizeof(*lh));
1628                 rc = -EIO;
1629         }
1630
1631         return rc;
1632 }
1633
1634 static void lfsck_layout_unlock(struct lustre_handle *lh)
1635 {
1636         if (lustre_handle_is_used(lh)) {
1637                 ldlm_lock_decref(lh, LCK_EX);
1638                 memset(lh, 0, sizeof(*lh));
1639         }
1640 }
1641
1642 static int lfsck_layout_trans_stop(const struct lu_env *env,
1643                                    struct dt_device *dev,
1644                                    struct thandle *handle, int result)
1645 {
1646         int rc;
1647
1648         handle->th_result = result;
1649         rc = dt_trans_stop(env, dev, handle);
1650         if (rc > 0)
1651                 rc = 0;
1652         else if (rc == 0)
1653                 rc = 1;
1654
1655         return rc;
1656 }
1657
1658 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
1659                                         struct lfsck_component *com,
1660                                         struct lfsck_tgt_desc *ltd,
1661                                         struct lu_orphan_rec *rec,
1662                                         struct lu_fid *cfid)
1663 {
1664         struct lfsck_layout             *lo     = com->lc_file_ram;
1665         int                              rc     = 0;
1666
1667         /* XXX: To be extended in other patch. */
1668
1669         down_write(&com->lc_sem);
1670         com->lc_new_scanned++;
1671         com->lc_new_checked++;
1672         if (rc > 0) {
1673                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
1674                 rc = 0;
1675         } else if (rc < 0) {
1676                 lo->ll_objs_failed_phase2++;
1677         }
1678         up_write(&com->lc_sem);
1679
1680         return rc;
1681 }
1682
1683 static int lfsck_layout_scan_orphan(const struct lu_env *env,
1684                                     struct lfsck_component *com,
1685                                     struct lfsck_tgt_desc *ltd)
1686 {
1687         struct lfsck_layout             *lo     = com->lc_file_ram;
1688         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1689         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1690         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1691         struct ost_id                   *oi     = &info->lti_oi;
1692         struct lu_fid                   *fid    = &info->lti_fid;
1693         struct dt_object                *obj;
1694         const struct dt_it_ops          *iops;
1695         struct dt_it                    *di;
1696         int                              rc     = 0;
1697         ENTRY;
1698
1699         CDEBUG(D_LFSCK, "%s: start the orphan scanning for OST%04x\n",
1700                lfsck_lfsck2name(lfsck), ltd->ltd_index);
1701
1702         ostid_set_seq(oi, FID_SEQ_IDIF);
1703         ostid_set_id(oi, 0);
1704         ostid_to_fid(fid, oi, ltd->ltd_index);
1705         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
1706         if (unlikely(IS_ERR(obj)))
1707                 RETURN(PTR_ERR(obj));
1708
1709         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
1710         if (rc != 0)
1711                 GOTO(put, rc);
1712
1713         iops = &obj->do_index_ops->dio_it;
1714         di = iops->init(env, obj, 0, BYPASS_CAPA);
1715         if (IS_ERR(di))
1716                 GOTO(put, rc = PTR_ERR(di));
1717
1718         rc = iops->load(env, di, 0);
1719         if (rc == -ESRCH) {
1720                 /* -ESRCH means that the orphan OST-objects rbtree has been
1721                  * cleanup because of the OSS server restart or other errors. */
1722                 lo->ll_flags |= LF_INCOMPLETE;
1723                 GOTO(fini, rc);
1724         }
1725
1726         if (rc == 0)
1727                 rc = iops->next(env, di);
1728         else if (rc > 0)
1729                 rc = 0;
1730
1731         if (rc < 0)
1732                 GOTO(fini, rc);
1733
1734         if (rc > 0)
1735                 GOTO(fini, rc = 0);
1736
1737         do {
1738                 struct dt_key           *key;
1739                 struct lu_orphan_rec    *rec = &info->lti_rec;
1740
1741                 key = iops->key(env, di);
1742                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
1743                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
1744                 if (rc == 0)
1745                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
1746                                         &com->lc_fid_latest_scanned_phase2);
1747                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
1748                         GOTO(fini, rc);
1749
1750                 lfsck_control_speed_by_self(com);
1751                 do {
1752                         rc = iops->next(env, di);
1753                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
1754         } while (rc == 0);
1755
1756         GOTO(fini, rc);
1757
1758 fini:
1759         iops->put(env, di);
1760         iops->fini(env, di);
1761 put:
1762         lu_object_put(env, &obj->do_lu);
1763
1764         CDEBUG(D_LFSCK, "%s: finish the orphan scanning for OST%04x, rc = %d\n",
1765                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
1766
1767         return rc > 0 ? 0 : rc;
1768 }
1769
1770 /* For the MDT-object with dangling reference, we need to re-create
1771  * the missed OST-object with the known FID/owner information. */
1772 static int lfsck_layout_recreate_ostobj(const struct lu_env *env,
1773                                         struct lfsck_component *com,
1774                                         struct lfsck_layout_req *llr,
1775                                         struct lu_attr *la)
1776 {
1777         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1778         struct filter_fid               *pfid   = &info->lti_new_pfid;
1779         struct dt_allocation_hint       *hint   = &info->lti_hint;
1780         struct dt_object                *parent = llr->llr_parent->llo_obj;
1781         struct dt_object                *child  = llr->llr_child;
1782         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
1783         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
1784         struct thandle                  *handle;
1785         struct lu_buf                   *buf;
1786         struct lustre_handle             lh     = { 0 };
1787         int                              rc;
1788         ENTRY;
1789
1790         CDEBUG(D_LFSCK, "Repair dangling reference for: parent "DFID
1791                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
1792                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
1793                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid);
1794
1795         rc = lfsck_layout_lock(env, com, parent, &lh,
1796                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
1797         if (rc != 0)
1798                 RETURN(rc);
1799
1800         handle = dt_trans_create(env, dev);
1801         if (IS_ERR(handle))
1802                 GOTO(unlock1, rc = PTR_ERR(handle));
1803
1804         hint->dah_parent = NULL;
1805         hint->dah_mode = 0;
1806         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1807         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1808         pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
1809         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1810
1811         rc = dt_declare_create(env, child, la, hint, NULL, handle);
1812         if (rc != 0)
1813                 GOTO(stop, rc);
1814
1815         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
1816                                   LU_XATTR_CREATE, handle);
1817         if (rc != 0)
1818                 GOTO(stop, rc);
1819
1820         rc = dt_trans_start(env, dev, handle);
1821         if (rc != 0)
1822                 GOTO(stop, rc);
1823
1824         dt_read_lock(env, parent, 0);
1825         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
1826                 GOTO(unlock2, rc = 1);
1827
1828         rc = dt_create(env, child, la, hint, NULL, handle);
1829         if (rc != 0)
1830                 GOTO(unlock2, rc);
1831
1832         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
1833                           handle, BYPASS_CAPA);
1834
1835         GOTO(unlock2, rc);
1836
1837 unlock2:
1838         dt_read_unlock(env, parent);
1839
1840 stop:
1841         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
1842
1843 unlock1:
1844         lfsck_layout_unlock(&lh);
1845
1846         return rc;
1847 }
1848
1849 /* If the OST-object does not recognize the MDT-object as its parent, and
1850  * there is no other MDT-object claims as its parent, then just trust the
1851  * given MDT-object as its parent. So update the OST-object filter_fid. */
1852 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
1853                                               struct lfsck_component *com,
1854                                               struct lfsck_layout_req *llr,
1855                                               const struct lu_attr *pla)
1856 {
1857         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1858         struct filter_fid               *pfid   = &info->lti_new_pfid;
1859         struct lu_attr                  *tla    = &info->lti_la3;
1860         struct dt_object                *parent = llr->llr_parent->llo_obj;
1861         struct dt_object                *child  = llr->llr_child;
1862         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
1863         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
1864         struct thandle                  *handle;
1865         struct lu_buf                   *buf;
1866         struct lustre_handle             lh     = { 0 };
1867         int                              rc;
1868         ENTRY;
1869
1870         CDEBUG(D_LFSCK, "Repair unmatched MDT-OST pair for: parent "DFID
1871                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
1872                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
1873                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
1874
1875         rc = lfsck_layout_lock(env, com, parent, &lh,
1876                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
1877         if (rc != 0)
1878                 RETURN(rc);
1879
1880         handle = dt_trans_create(env, dev);
1881         if (IS_ERR(handle))
1882                 GOTO(unlock1, rc = PTR_ERR(handle));
1883
1884         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1885         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1886         /* The ff_parent->f_ver is not the real parent fid->f_ver. Instead,
1887          * it is the OST-object index in the parent MDT-object layout. */
1888         pfid->ff_parent.f_ver = cpu_to_le32(llr->llr_lov_idx);
1889         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1890
1891         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1892         if (rc != 0)
1893                 GOTO(stop, rc);
1894
1895         tla->la_valid = LA_UID | LA_GID;
1896         tla->la_uid = pla->la_uid;
1897         tla->la_gid = pla->la_gid;
1898         rc = dt_declare_attr_set(env, child, tla, handle);
1899         if (rc != 0)
1900                 GOTO(stop, rc);
1901
1902         rc = dt_trans_start(env, dev, handle);
1903         if (rc != 0)
1904                 GOTO(stop, rc);
1905
1906         dt_write_lock(env, parent, 0);
1907         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
1908                 GOTO(unlock2, rc = 1);
1909
1910         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1911                           BYPASS_CAPA);
1912         if (rc != 0)
1913                 GOTO(unlock2, rc);
1914
1915         /* Get the latest parent's owner. */
1916         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
1917         if (rc != 0)
1918                 GOTO(unlock2, rc);
1919
1920         tla->la_valid = LA_UID | LA_GID;
1921         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
1922
1923         GOTO(unlock2, rc);
1924
1925 unlock2:
1926         dt_write_unlock(env, parent);
1927
1928 stop:
1929         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
1930
1931 unlock1:
1932         lfsck_layout_unlock(&lh);
1933
1934         return rc;
1935 }
1936
1937 /* If there are more than one MDT-objects claim as the OST-object's parent,
1938  * and the OST-object only recognizes one of them, then we need to generate
1939  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
1940 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
1941                                                    struct lfsck_component *com,
1942                                                    struct lfsck_layout_req *llr,
1943                                                    struct lu_attr *la,
1944                                                    struct lu_buf *buf)
1945 {
1946         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1947         struct dt_allocation_hint       *hint   = &info->lti_hint;
1948         struct dt_object_format         *dof    = &info->lti_dof;
1949         struct dt_device                *pdev   = com->lc_lfsck->li_next;
1950         struct ost_id                   *oi     = &info->lti_oi;
1951         struct dt_object                *parent = llr->llr_parent->llo_obj;
1952         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
1953         struct dt_object                *child  = NULL;
1954         struct lu_device                *d      = &cdev->dd_lu_dev;
1955         struct lu_object                *o      = NULL;
1956         struct thandle                  *handle;
1957         struct lov_mds_md_v1            *lmm;
1958         struct lov_ost_data_v1          *objs;
1959         struct lustre_handle             lh     = { 0 };
1960         __u32                            magic;
1961         int                              rc;
1962         ENTRY;
1963
1964         CDEBUG(D_LFSCK, "Repair multiple references for: parent "DFID
1965                ", OST-index %u, stripe-index %u, owner %u:%u\n",
1966                PFID(lfsck_dto2fid(parent)), llr->llr_ost_idx,
1967                llr->llr_lov_idx, la->la_uid, la->la_gid);
1968
1969         rc = lfsck_layout_lock(env, com, parent, &lh,
1970                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
1971         if (rc != 0)
1972                 RETURN(rc);
1973
1974         handle = dt_trans_create(env, pdev);
1975         if (IS_ERR(handle))
1976                 GOTO(unlock1, rc = PTR_ERR(handle));
1977
1978         o = lu_object_anon(env, d, NULL);
1979         if (IS_ERR(o))
1980                 GOTO(stop, rc = PTR_ERR(o));
1981
1982         child = container_of(o, struct dt_object, do_lu);
1983         o = lu_object_locate(o->lo_header, d->ld_type);
1984         if (unlikely(o == NULL))
1985                 GOTO(stop, rc = -EINVAL);
1986
1987         child = container_of(o, struct dt_object, do_lu);
1988         la->la_valid = LA_UID | LA_GID;
1989         hint->dah_parent = NULL;
1990         hint->dah_mode = 0;
1991         dof->dof_type = DFT_REGULAR;
1992         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
1993         if (rc != 0)
1994                 GOTO(stop, rc);
1995
1996         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
1997                                   LU_XATTR_REPLACE, handle);
1998         if (rc != 0)
1999                 GOTO(stop, rc);
2000
2001         rc = dt_trans_start(env, pdev, handle);
2002         if (rc != 0)
2003                 GOTO(stop, rc);
2004
2005         dt_write_lock(env, parent, 0);
2006         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2007                 GOTO(unlock2, rc = 0);
2008
2009         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2010         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
2011                 GOTO(unlock2, rc = 0);
2012
2013         lmm = buf->lb_buf;
2014         rc = lfsck_layout_verify_header(lmm);
2015         if (rc != 0)
2016                 GOTO(unlock2, rc);
2017
2018         /* Someone change layout during the LFSCK, no need to repair then. */
2019         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
2020                 GOTO(unlock2, rc = 0);
2021
2022         rc = dt_create(env, child, la, hint, dof, handle);
2023         if (rc != 0)
2024                 GOTO(unlock2, rc);
2025
2026         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2027          * been verified in lfsck_layout_verify_header() already. If some
2028          * new magic introduced in the future, then layout LFSCK needs to
2029          * be updated also. */
2030         magic = le32_to_cpu(lmm->lmm_magic);
2031         if (magic == LOV_MAGIC_V1) {
2032                 objs = &(lmm->lmm_objects[0]);
2033         } else {
2034                 LASSERT(magic == LOV_MAGIC_V3);
2035                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2036         }
2037
2038         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
2039         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
2040         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
2041         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
2042         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
2043         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2044                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
2045
2046         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
2047
2048 unlock2:
2049         dt_write_unlock(env, parent);
2050
2051 stop:
2052         if (child != NULL)
2053                 lu_object_put(env, &child->do_lu);
2054
2055         dt_trans_stop(env, pdev, handle);
2056
2057 unlock1:
2058         lfsck_layout_unlock(&lh);
2059
2060         return rc;
2061 }
2062
2063 /* If the MDT-object and the OST-object have different owner information,
2064  * then trust the MDT-object, because the normal chown/chgrp handle order
2065  * is from MDT to OST, and it is possible that some chown/chgrp operation
2066  * is partly done. */
2067 static int lfsck_layout_repair_owner(const struct lu_env *env,
2068                                      struct lfsck_component *com,
2069                                      struct lfsck_layout_req *llr,
2070                                      struct lu_attr *pla)
2071 {
2072         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2073         struct lu_attr                  *tla    = &info->lti_la3;
2074         struct dt_object                *parent = llr->llr_parent->llo_obj;
2075         struct dt_object                *child  = llr->llr_child;
2076         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2077         struct thandle                  *handle;
2078         int                              rc;
2079         ENTRY;
2080
2081         CDEBUG(D_LFSCK, "Repair inconsistent file owner for: parent "DFID
2082                ", child "DFID", OST-index %u, stripe-index %u, owner %u:%u\n",
2083                PFID(lfsck_dto2fid(parent)), PFID(lfsck_dto2fid(child)),
2084                llr->llr_ost_idx, llr->llr_lov_idx, pla->la_uid, pla->la_gid);
2085
2086         handle = dt_trans_create(env, dev);
2087         if (IS_ERR(handle))
2088                 RETURN(PTR_ERR(handle));
2089
2090         tla->la_uid = pla->la_uid;
2091         tla->la_gid = pla->la_gid;
2092         tla->la_valid = LA_UID | LA_GID;
2093         rc = dt_declare_attr_set(env, child, tla, handle);
2094         if (rc != 0)
2095                 GOTO(stop, rc);
2096
2097         rc = dt_trans_start(env, dev, handle);
2098         if (rc != 0)
2099                 GOTO(stop, rc);
2100
2101         /* Use the dt_object lock to serialize with destroy and attr_set. */
2102         dt_read_lock(env, parent, 0);
2103         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2104                 GOTO(unlock, rc = 1);
2105
2106         /* Get the latest parent's owner. */
2107         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
2108         if (rc != 0) {
2109                 CWARN("%s: fail to get the latest parent's ("DFID") owner, "
2110                       "not sure whether some others chown/chgrp during the "
2111                       "LFSCK: rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2112                       PFID(lfsck_dto2fid(parent)), rc);
2113
2114                 GOTO(unlock, rc);
2115         }
2116
2117         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
2118         if (unlikely(tla->la_uid != pla->la_uid ||
2119                      tla->la_gid != pla->la_gid))
2120                 GOTO(unlock, rc = 1);
2121
2122         tla->la_valid = LA_UID | LA_GID;
2123         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
2124
2125         GOTO(unlock, rc);
2126
2127 unlock:
2128         dt_read_unlock(env, parent);
2129
2130 stop:
2131         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2132
2133         return rc;
2134 }
2135
2136 /* Check whether the OST-object correctly back points to the
2137  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
2138 static int lfsck_layout_check_parent(const struct lu_env *env,
2139                                      struct lfsck_component *com,
2140                                      struct dt_object *parent,
2141                                      const struct lu_fid *pfid,
2142                                      const struct lu_fid *cfid,
2143                                      const struct lu_attr *pla,
2144                                      const struct lu_attr *cla,
2145                                      struct lfsck_layout_req *llr,
2146                                      struct lu_buf *lov_ea, __u32 idx)
2147 {
2148         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2149         struct lu_buf                   *buf    = &info->lti_big_buf;
2150         struct dt_object                *tobj;
2151         struct lov_mds_md_v1            *lmm;
2152         struct lov_ost_data_v1          *objs;
2153         int                              rc;
2154         int                              i;
2155         __u32                            magic;
2156         __u16                            count;
2157         ENTRY;
2158
2159         if (fid_is_zero(pfid)) {
2160                 /* client never wrote. */
2161                 if (cla->la_size == 0 && cla->la_blocks == 0) {
2162                         if (unlikely(cla->la_uid != pla->la_uid ||
2163                                      cla->la_gid != pla->la_gid))
2164                                 RETURN (LLIT_INCONSISTENT_OWNER);
2165
2166                         RETURN(0);
2167                 }
2168
2169                 RETURN(LLIT_UNMATCHED_PAIR);
2170         }
2171
2172         if (unlikely(!fid_is_sane(pfid)))
2173                 RETURN(LLIT_UNMATCHED_PAIR);
2174
2175         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
2176                 if (llr->llr_lov_idx == idx)
2177                         RETURN(0);
2178
2179                 RETURN(LLIT_UNMATCHED_PAIR);
2180         }
2181
2182         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
2183         if (tobj == NULL)
2184                 RETURN(LLIT_UNMATCHED_PAIR);
2185
2186         if (IS_ERR(tobj))
2187                 RETURN(PTR_ERR(tobj));
2188
2189         if (!dt_object_exists(tobj))
2190                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2191
2192         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
2193          * remote one on another MDT. Then check whether the given OST-object
2194          * is in such layout. If yes, it is multiple referenced, otherwise it
2195          * is unmatched referenced case. */
2196         rc = lfsck_layout_get_lovea(env, tobj, buf, NULL);
2197         if (rc == 0)
2198                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2199
2200         if (rc < 0)
2201                 GOTO(out, rc);
2202
2203         lmm = buf->lb_buf;
2204         rc = lfsck_layout_verify_header(lmm);
2205         if (rc != 0)
2206                 GOTO(out, rc);
2207
2208         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2209          * been verified in lfsck_layout_verify_header() already. If some
2210          * new magic introduced in the future, then layout LFSCK needs to
2211          * be updated also. */
2212         magic = le32_to_cpu(lmm->lmm_magic);
2213         if (magic == LOV_MAGIC_V1) {
2214                 objs = &(lmm->lmm_objects[0]);
2215         } else {
2216                 LASSERT(magic == LOV_MAGIC_V3);
2217                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2218         }
2219
2220         count = le16_to_cpu(lmm->lmm_stripe_count);
2221         for (i = 0; i < count; i++, objs++) {
2222                 struct lu_fid           *tfid   = &info->lti_fid2;
2223                 struct ost_id           *oi     = &info->lti_oi;
2224
2225                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2226                 ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx));
2227                 if (lu_fid_eq(cfid, tfid)) {
2228                         *lov_ea = *buf;
2229
2230                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
2231                 }
2232         }
2233
2234         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
2235
2236 out:
2237         lfsck_object_put(env, tobj);
2238
2239         return rc;
2240 }
2241
2242 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
2243                                              struct lfsck_component *com,
2244                                              struct lfsck_layout_req *llr)
2245 {
2246         struct lfsck_layout                  *lo     = com->lc_file_ram;
2247         struct lfsck_thread_info             *info   = lfsck_env_info(env);
2248         struct filter_fid_old                *pea    = &info->lti_old_pfid;
2249         struct lu_fid                        *pfid   = &info->lti_fid;
2250         struct lu_buf                        *buf    = NULL;
2251         struct dt_object                     *parent = llr->llr_parent->llo_obj;
2252         struct dt_object                     *child  = llr->llr_child;
2253         struct lu_attr                       *pla    = &info->lti_la;
2254         struct lu_attr                       *cla    = &info->lti_la2;
2255         struct lfsck_instance                *lfsck  = com->lc_lfsck;
2256         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
2257         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
2258         __u32                                 idx    = 0;
2259         int                                   rc;
2260         ENTRY;
2261
2262         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
2263         if (rc != 0) {
2264                 if (lu_object_is_dying(parent->do_lu.lo_header))
2265                         RETURN(0);
2266
2267                 GOTO(out, rc);
2268         }
2269
2270         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
2271         if (rc == -ENOENT) {
2272                 if (lu_object_is_dying(parent->do_lu.lo_header))
2273                         RETURN(0);
2274
2275                 type = LLIT_DANGLING;
2276                 goto repair;
2277         }
2278
2279         if (rc != 0)
2280                 GOTO(out, rc);
2281
2282         buf = lfsck_buf_get(env, pea, sizeof(struct filter_fid_old));
2283         rc= dt_xattr_get(env, child, buf, XATTR_NAME_FID, BYPASS_CAPA);
2284         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
2285                      rc != sizeof(struct filter_fid))) {
2286                 type = LLIT_UNMATCHED_PAIR;
2287                 goto repair;
2288         }
2289
2290         if (rc < 0 && rc != -ENODATA)
2291                 GOTO(out, rc);
2292
2293         if (rc == -ENODATA) {
2294                 fid_zero(pfid);
2295         } else {
2296                 fid_le_to_cpu(pfid, &pea->ff_parent);
2297                 /* OST-object does not save parent FID::f_ver, instead,
2298                  * the OST-object index in the parent MDT-object layout
2299                  * EA reuses the pfid->f_ver. */
2300                 idx = pfid->f_ver;
2301                 pfid->f_ver = 0;
2302         }
2303
2304         rc = lfsck_layout_check_parent(env, com, parent, pfid,
2305                                        lu_object_fid(&child->do_lu),
2306                                        pla, cla, llr, buf, idx);
2307         if (rc > 0) {
2308                 type = rc;
2309                 goto repair;
2310         }
2311
2312         if (rc < 0)
2313                 GOTO(out, rc);
2314
2315         if (unlikely(cla->la_uid != pla->la_uid ||
2316                      cla->la_gid != pla->la_gid)) {
2317                 type = LLIT_INCONSISTENT_OWNER;
2318                 goto repair;
2319         }
2320
2321 repair:
2322         if (bk->lb_param & LPF_DRYRUN) {
2323                 if (type != LLIT_NONE)
2324                         GOTO(out, rc = 1);
2325                 else
2326                         GOTO(out, rc = 0);
2327         }
2328
2329         switch (type) {
2330         case LLIT_DANGLING:
2331                 memset(cla, 0, sizeof(*cla));
2332                 cla->la_uid = pla->la_uid;
2333                 cla->la_gid = pla->la_gid;
2334                 cla->la_mode = S_IFREG | 0666;
2335                 cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2336                                 LA_ATIME | LA_MTIME | LA_CTIME;
2337                 rc = lfsck_layout_recreate_ostobj(env, com, llr, cla);
2338                 break;
2339         case LLIT_UNMATCHED_PAIR:
2340                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
2341                 break;
2342         case LLIT_MULTIPLE_REFERENCED:
2343                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
2344                                                              pla, buf);
2345                 break;
2346         case LLIT_INCONSISTENT_OWNER:
2347                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
2348                 break;
2349         default:
2350                 rc = 0;
2351                 break;
2352         }
2353
2354         GOTO(out, rc);
2355
2356 out:
2357         down_write(&com->lc_sem);
2358         if (rc < 0) {
2359                 /* If cannot touch the target server,
2360                  * mark the LFSCK as INCOMPLETE. */
2361                 if (rc == -ENOTCONN || rc == -ESHUTDOWN || rc == -ETIMEDOUT ||
2362                     rc == -EHOSTDOWN || rc == -EHOSTUNREACH) {
2363                         CERROR("%s: Fail to talk with OST %x: rc = %d.\n",
2364                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
2365                         lo->ll_flags |= LF_INCOMPLETE;
2366                         lo->ll_objs_skipped++;
2367                         rc = 0;
2368                 } else {
2369                         lo->ll_objs_failed_phase1++;
2370                 }
2371         } else if (rc > 0) {
2372                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
2373                          "unknown type = %d\n", type);
2374
2375                 lo->ll_objs_repaired[type - 1]++;
2376         }
2377         up_write(&com->lc_sem);
2378
2379         return rc;
2380 }
2381
2382 static int lfsck_layout_assistant(void *args)
2383 {
2384         struct lfsck_thread_args        *lta     = args;
2385         struct lu_env                   *env     = &lta->lta_env;
2386         struct lfsck_component          *com     = lta->lta_com;
2387         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
2388         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
2389         struct lfsck_position           *pos     = &com->lc_pos_start;
2390         struct lfsck_thread_info        *info    = lfsck_env_info(env);
2391         struct lfsck_request            *lr      = &info->lti_lr;
2392         struct lfsck_layout_master_data *llmd    = com->lc_data;
2393         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2394         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2395         struct lfsck_layout_req         *llr;
2396         struct l_wait_info               lwi     = { 0 };
2397         int                              rc      = 0;
2398         int                              rc1     = 0;
2399         ENTRY;
2400
2401         memset(lr, 0, sizeof(*lr));
2402         lr->lr_event = LE_START;
2403         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2404                        LSV_ASYNC_WINDOWS;
2405         lr->lr_speed = bk->lb_speed_limit;
2406         lr->lr_version = bk->lb_version;
2407         lr->lr_param = bk->lb_param;
2408         lr->lr_async_windows = bk->lb_async_windows;
2409         lr->lr_flags = LEF_TO_OST;
2410         if (pos->lp_oit_cookie <= 1)
2411                 lr->lr_param |= LPF_RESET;
2412
2413         rc = lfsck_layout_master_notify_others(env, com, lr);
2414         if (rc != 0) {
2415                 CERROR("%s: fail to notify others for layout start: rc = %d\n",
2416                        lfsck_lfsck2name(lfsck), rc);
2417                 GOTO(fini, rc);
2418         }
2419
2420         spin_lock(&llmd->llmd_lock);
2421         thread_set_flags(athread, SVC_RUNNING);
2422         spin_unlock(&llmd->llmd_lock);
2423         wake_up_all(&mthread->t_ctl_waitq);
2424
2425         while (1) {
2426                 while (!list_empty(&llmd->llmd_req_list)) {
2427                         bool wakeup = false;
2428
2429                         if (unlikely(llmd->llmd_exit))
2430                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
2431
2432                         llr = list_entry(llmd->llmd_req_list.next,
2433                                          struct lfsck_layout_req,
2434                                          llr_list);
2435                         /* Only the lfsck_layout_assistant thread itself can
2436                          * remove the "llr" from the head of the list, LFSCK
2437                          * engine thread only inserts other new "lld" at the
2438                          * end of the list. So it is safe to handle current
2439                          * "llr" without the spin_lock. */
2440                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
2441                         spin_lock(&llmd->llmd_lock);
2442                         list_del_init(&llr->llr_list);
2443                         if (bk->lb_async_windows != 0 &&
2444                             llmd->llmd_prefetched >= bk->lb_async_windows)
2445                                 wakeup = true;
2446
2447                         llmd->llmd_prefetched--;
2448                         spin_unlock(&llmd->llmd_lock);
2449                         if (wakeup)
2450                                 wake_up_all(&mthread->t_ctl_waitq);
2451
2452                         lfsck_layout_req_fini(env, llr);
2453                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
2454                                 GOTO(cleanup1, rc);
2455                 }
2456
2457                 /* Wakeup the master engine if it is waiting in checkpoint. */
2458                 wake_up_all(&mthread->t_ctl_waitq);
2459
2460                 l_wait_event(athread->t_ctl_waitq,
2461                              !lfsck_layout_req_empty(llmd) ||
2462                              llmd->llmd_exit ||
2463                              llmd->llmd_to_post ||
2464                              llmd->llmd_to_double_scan,
2465                              &lwi);
2466
2467                 if (unlikely(llmd->llmd_exit))
2468                         GOTO(cleanup1, rc = llmd->llmd_post_result);
2469
2470                 if (!list_empty(&llmd->llmd_req_list))
2471                         continue;
2472
2473                 if (llmd->llmd_to_post) {
2474                         llmd->llmd_to_post = 0;
2475                         LASSERT(llmd->llmd_post_result > 0);
2476
2477                         memset(lr, 0, sizeof(*lr));
2478                         lr->lr_event = LE_PHASE1_DONE;
2479                         lr->lr_status = llmd->llmd_post_result;
2480                         rc = lfsck_layout_master_notify_others(env, com, lr);
2481                         if (rc != 0)
2482                                 CERROR("%s: failed to notify others "
2483                                        "for layout post: rc = %d\n",
2484                                        lfsck_lfsck2name(lfsck), rc);
2485
2486                         /* Wakeup the master engine to go ahead. */
2487                         wake_up_all(&mthread->t_ctl_waitq);
2488                 }
2489
2490                 if (llmd->llmd_to_double_scan) {
2491                         llmd->llmd_to_double_scan = 0;
2492                         atomic_inc(&lfsck->li_double_scan_count);
2493                         llmd->llmd_in_double_scan = 1;
2494                         wake_up_all(&mthread->t_ctl_waitq);
2495
2496                         com->lc_new_checked = 0;
2497                         com->lc_new_scanned = 0;
2498                         com->lc_time_last_checkpoint = cfs_time_current();
2499                         com->lc_time_next_checkpoint =
2500                                 com->lc_time_last_checkpoint +
2501                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
2502
2503                         while (llmd->llmd_in_double_scan) {
2504                                 struct lfsck_tgt_descs  *ltds =
2505                                                         &lfsck->li_ost_descs;
2506                                 struct lfsck_tgt_desc   *ltd;
2507
2508                                 rc = lfsck_layout_master_query_others(env, com);
2509                                 if (lfsck_layout_master_to_orphan(llmd))
2510                                         goto orphan;
2511
2512                                 if (rc < 0)
2513                                         GOTO(cleanup2, rc);
2514
2515                                 /* Pull LFSCK status on related targets once
2516                                  * per 30 seconds if we are not notified. */
2517                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
2518                                                            cfs_time_seconds(1),
2519                                                            NULL, NULL);
2520                                 rc = l_wait_event(athread->t_ctl_waitq,
2521                                         lfsck_layout_master_to_orphan(llmd) ||
2522                                         llmd->llmd_exit ||
2523                                         !thread_is_running(mthread),
2524                                         &lwi);
2525
2526                                 if (unlikely(llmd->llmd_exit ||
2527                                              !thread_is_running(mthread)))
2528                                         GOTO(cleanup2, rc = 0);
2529
2530                                 if (rc == -ETIMEDOUT)
2531                                         continue;
2532
2533                                 if (rc < 0)
2534                                         GOTO(cleanup2, rc);
2535
2536 orphan:
2537                                 spin_lock(&ltds->ltd_lock);
2538                                 while (!list_empty(
2539                                                 &llmd->llmd_ost_phase2_list)) {
2540                                         ltd = list_entry(
2541                                               llmd->llmd_ost_phase2_list.next,
2542                                               struct lfsck_tgt_desc,
2543                                               ltd_layout_phase_list);
2544                                         list_del_init(
2545                                                 &ltd->ltd_layout_phase_list);
2546                                         spin_unlock(&ltds->ltd_lock);
2547
2548                                         if (bk->lb_param & LPF_ALL_TGT) {
2549                                                 rc = lfsck_layout_scan_orphan(
2550                                                                 env, com, ltd);
2551                                                 if (rc != 0 &&
2552                                                     bk->lb_param & LPF_FAILOUT)
2553                                                         GOTO(cleanup2, rc);
2554                                         }
2555
2556                                         if (unlikely(llmd->llmd_exit ||
2557                                                 !thread_is_running(mthread)))
2558                                                 GOTO(cleanup2, rc = 0);
2559
2560                                         spin_lock(&ltds->ltd_lock);
2561                                 }
2562
2563                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
2564                                         spin_unlock(&ltds->ltd_lock);
2565                                         GOTO(cleanup2, rc = 1);
2566                                 }
2567                                 spin_unlock(&ltds->ltd_lock);
2568                         }
2569                 }
2570         }
2571
2572 cleanup1:
2573         /* Cleanup the unfinished requests. */
2574         spin_lock(&llmd->llmd_lock);
2575         if (rc < 0)
2576                 llmd->llmd_assistant_status = rc;
2577
2578         while (!list_empty(&llmd->llmd_req_list)) {
2579                 llr = list_entry(llmd->llmd_req_list.next,
2580                                  struct lfsck_layout_req,
2581                                  llr_list);
2582                 list_del_init(&llr->llr_list);
2583                 llmd->llmd_prefetched--;
2584                 spin_unlock(&llmd->llmd_lock);
2585                 lfsck_layout_req_fini(env, llr);
2586                 spin_lock(&llmd->llmd_lock);
2587         }
2588         spin_unlock(&llmd->llmd_lock);
2589
2590         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
2591                  llmd->llmd_prefetched);
2592
2593 cleanup2:
2594         memset(lr, 0, sizeof(*lr));
2595         if (rc > 0) {
2596                 lr->lr_event = LE_PHASE2_DONE;
2597                 lr->lr_status = rc;
2598         } else if (rc == 0) {
2599                 if (lfsck->li_flags & LPF_ALL_TGT) {
2600                         lr->lr_event = LE_STOP;
2601                         lr->lr_status = LS_STOPPED;
2602                 } else {
2603                         lr->lr_event = LE_PEER_EXIT;
2604                         switch (lfsck->li_status) {
2605                         case LS_PAUSED:
2606                         case LS_CO_PAUSED:
2607                                 lr->lr_status = LS_CO_PAUSED;
2608                                 break;
2609                         case LS_STOPPED:
2610                         case LS_CO_STOPPED:
2611                                 lr->lr_status = LS_CO_STOPPED;
2612                                 break;
2613                         default:
2614                                 CERROR("%s: unknown status: rc = %d\n",
2615                                        lfsck_lfsck2name(lfsck),
2616                                        lfsck->li_status);
2617                                 lr->lr_status = LS_CO_FAILED;
2618                                 break;
2619                         }
2620                 }
2621         } else {
2622                 if (lfsck->li_flags & LPF_ALL_TGT) {
2623                         lr->lr_event = LE_STOP;
2624                         lr->lr_status = LS_FAILED;
2625                 } else {
2626                         lr->lr_event = LE_PEER_EXIT;
2627                         lr->lr_status = LS_CO_FAILED;
2628                 }
2629         }
2630
2631         rc1 = lfsck_layout_master_notify_others(env, com, lr);
2632         if (rc1 != 0) {
2633                 CERROR("%s: failed to notify others for layout quit: rc = %d\n",
2634                        lfsck_lfsck2name(lfsck), rc1);
2635                 rc = rc1;
2636         }
2637
2638         /* Under force exit case, some requests may be just freed without
2639          * verification, those objects should be re-handled when next run.
2640          * So not update the on-disk tracing file under such case. */
2641         if (!llmd->llmd_exit)
2642                 rc1 = lfsck_layout_double_scan_result(env, com, rc);
2643
2644 fini:
2645         if (llmd->llmd_in_double_scan)
2646                 atomic_dec(&lfsck->li_double_scan_count);
2647
2648         spin_lock(&llmd->llmd_lock);
2649         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
2650         thread_set_flags(athread, SVC_STOPPED);
2651         wake_up_all(&mthread->t_ctl_waitq);
2652         spin_unlock(&llmd->llmd_lock);
2653         lfsck_thread_args_fini(lta);
2654
2655         return rc;
2656 }
2657
2658 static int
2659 lfsck_layout_slave_async_interpret(const struct lu_env *env,
2660                                    struct ptlrpc_request *req,
2661                                    void *args, int rc)
2662 {
2663         struct lfsck_layout_slave_async_args *llsaa = args;
2664         struct obd_export                    *exp   = llsaa->llsaa_exp;
2665         struct lfsck_component               *com   = llsaa->llsaa_com;
2666         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
2667         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
2668         bool                                  done  = false;
2669
2670         if (rc != 0) {
2671                 /* It is quite probably caused by target crash,
2672                  * to make the LFSCK can go ahead, assume that
2673                  * the target finished the LFSCK prcoessing. */
2674                 done = true;
2675         } else {
2676                 struct lfsck_reply *lr;
2677
2678                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
2679                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
2680                     lr->lr_status != LS_SCANNING_PHASE2)
2681                         done = true;
2682         }
2683         if (done)
2684                 lfsck_layout_llst_del(llsd, llst);
2685         lfsck_layout_llst_put(llst);
2686         lfsck_component_put(env, com);
2687         class_export_put(exp);
2688
2689         return 0;
2690 }
2691
2692 static int lfsck_layout_async_query(const struct lu_env *env,
2693                                     struct lfsck_component *com,
2694                                     struct obd_export *exp,
2695                                     struct lfsck_layout_slave_target *llst,
2696                                     struct lfsck_request *lr,
2697                                     struct ptlrpc_request_set *set)
2698 {
2699         struct lfsck_layout_slave_async_args *llsaa;
2700         struct ptlrpc_request                *req;
2701         struct lfsck_request                 *tmp;
2702         int                                   rc;
2703         ENTRY;
2704
2705         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
2706         if (req == NULL)
2707                 RETURN(-ENOMEM);
2708
2709         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
2710         if (rc != 0) {
2711                 ptlrpc_request_free(req);
2712                 RETURN(rc);
2713         }
2714
2715         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2716         *tmp = *lr;
2717         ptlrpc_request_set_replen(req);
2718
2719         llsaa = ptlrpc_req_async_args(req);
2720         llsaa->llsaa_exp = exp;
2721         llsaa->llsaa_com = lfsck_component_get(com);
2722         llsaa->llsaa_llst = llst;
2723         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
2724         ptlrpc_set_add_req(set, req);
2725
2726         RETURN(0);
2727 }
2728
2729 static int lfsck_layout_async_notify(const struct lu_env *env,
2730                                      struct obd_export *exp,
2731                                      struct lfsck_request *lr,
2732                                      struct ptlrpc_request_set *set)
2733 {
2734         struct ptlrpc_request   *req;
2735         struct lfsck_request    *tmp;
2736         int                      rc;
2737         ENTRY;
2738
2739         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2740         if (req == NULL)
2741                 RETURN(-ENOMEM);
2742
2743         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2744         if (rc != 0) {
2745                 ptlrpc_request_free(req);
2746                 RETURN(rc);
2747         }
2748
2749         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2750         *tmp = *lr;
2751         ptlrpc_request_set_replen(req);
2752         ptlrpc_set_add_req(set, req);
2753
2754         RETURN(0);
2755 }
2756
2757 static int
2758 lfsck_layout_slave_query_master(const struct lu_env *env,
2759                                 struct lfsck_component *com)
2760 {
2761         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
2762         struct lfsck_instance            *lfsck = com->lc_lfsck;
2763         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2764         struct lfsck_layout_slave_target *llst;
2765         struct obd_export                *exp;
2766         struct ptlrpc_request_set        *set;
2767         int                               rc    = 0;
2768         int                               rc1   = 0;
2769         ENTRY;
2770
2771         set = ptlrpc_prep_set();
2772         if (set == NULL)
2773                 RETURN(-ENOMEM);
2774
2775         memset(lr, 0, sizeof(*lr));
2776         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2777         lr->lr_event = LE_QUERY;
2778         lr->lr_active = LT_LAYOUT;
2779
2780         llsd->llsd_touch_gen++;
2781         spin_lock(&llsd->llsd_lock);
2782         while (!list_empty(&llsd->llsd_master_list)) {
2783                 llst = list_entry(llsd->llsd_master_list.next,
2784                                   struct lfsck_layout_slave_target,
2785                                   llst_list);
2786                 if (llst->llst_gen == llsd->llsd_touch_gen)
2787                         break;
2788
2789                 llst->llst_gen = llsd->llsd_touch_gen;
2790                 list_del(&llst->llst_list);
2791                 list_add_tail(&llst->llst_list,
2792                               &llsd->llsd_master_list);
2793                 atomic_inc(&llst->llst_ref);
2794                 spin_unlock(&llsd->llsd_lock);
2795
2796                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
2797                                                llst->llst_index);
2798                 if (exp == NULL) {
2799                         lfsck_layout_llst_del(llsd, llst);
2800                         lfsck_layout_llst_put(llst);
2801                         spin_lock(&llsd->llsd_lock);
2802                         continue;
2803                 }
2804
2805                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
2806                 if (rc != 0) {
2807                         CERROR("%s: slave fail to query %s for layout: "
2808                                "rc = %d\n", lfsck_lfsck2name(lfsck),
2809                                exp->exp_obd->obd_name, rc);
2810                         rc1 = rc;
2811                         lfsck_layout_llst_put(llst);
2812                         class_export_put(exp);
2813                 }
2814                 spin_lock(&llsd->llsd_lock);
2815         }
2816         spin_unlock(&llsd->llsd_lock);
2817
2818         rc = ptlrpc_set_wait(set);
2819         ptlrpc_set_destroy(set);
2820
2821         RETURN(rc1 != 0 ? rc1 : rc);
2822 }
2823
2824 static void
2825 lfsck_layout_slave_notify_master(const struct lu_env *env,
2826                                  struct lfsck_component *com,
2827                                  enum lfsck_events event, int result)
2828 {
2829         struct lfsck_instance            *lfsck = com->lc_lfsck;
2830         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
2831         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
2832         struct lfsck_layout_slave_target *llst;
2833         struct obd_export                *exp;
2834         struct ptlrpc_request_set        *set;
2835         int                               rc;
2836         ENTRY;
2837
2838         set = ptlrpc_prep_set();
2839         if (set == NULL)
2840                 RETURN_EXIT;
2841
2842         memset(lr, 0, sizeof(*lr));
2843         lr->lr_event = event;
2844         lr->lr_flags = LEF_FROM_OST;
2845         lr->lr_status = result;
2846         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2847         lr->lr_active = LT_LAYOUT;
2848         llsd->llsd_touch_gen++;
2849         spin_lock(&llsd->llsd_lock);
2850         while (!list_empty(&llsd->llsd_master_list)) {
2851                 llst = list_entry(llsd->llsd_master_list.next,
2852                                   struct lfsck_layout_slave_target,
2853                                   llst_list);
2854                 if (llst->llst_gen == llsd->llsd_touch_gen)
2855                         break;
2856
2857                 llst->llst_gen = llsd->llsd_touch_gen;
2858                 list_del(&llst->llst_list);
2859                 list_add_tail(&llst->llst_list,
2860                               &llsd->llsd_master_list);
2861                 atomic_inc(&llst->llst_ref);
2862                 spin_unlock(&llsd->llsd_lock);
2863
2864                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
2865                                                llst->llst_index);
2866                 if (exp == NULL) {
2867                         lfsck_layout_llst_del(llsd, llst);
2868                         lfsck_layout_llst_put(llst);
2869                         spin_lock(&llsd->llsd_lock);
2870                         continue;
2871                 }
2872
2873                 rc = lfsck_layout_async_notify(env, exp, lr, set);
2874                 if (rc != 0)
2875                         CERROR("%s: slave fail to notify %s for layout: "
2876                                "rc = %d\n", lfsck_lfsck2name(lfsck),
2877                                exp->exp_obd->obd_name, rc);
2878                 lfsck_layout_llst_put(llst);
2879                 class_export_put(exp);
2880                 spin_lock(&llsd->llsd_lock);
2881         }
2882         spin_unlock(&llsd->llsd_lock);
2883
2884         ptlrpc_set_wait(set);
2885         ptlrpc_set_destroy(set);
2886
2887         RETURN_EXIT;
2888 }
2889
2890 /* layout APIs */
2891
2892 static int lfsck_layout_reset(const struct lu_env *env,
2893                               struct lfsck_component *com, bool init)
2894 {
2895         struct lfsck_layout     *lo    = com->lc_file_ram;
2896         int                      rc;
2897
2898         down_write(&com->lc_sem);
2899         if (init) {
2900                 memset(lo, 0, com->lc_file_size);
2901         } else {
2902                 __u32 count = lo->ll_success_count;
2903                 __u64 last_time = lo->ll_time_last_complete;
2904
2905                 memset(lo, 0, com->lc_file_size);
2906                 lo->ll_success_count = count;
2907                 lo->ll_time_last_complete = last_time;
2908         }
2909
2910         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
2911         lo->ll_status = LS_INIT;
2912
2913         rc = lfsck_layout_store(env, com);
2914         up_write(&com->lc_sem);
2915
2916         return rc;
2917 }
2918
2919 static void lfsck_layout_fail(const struct lu_env *env,
2920                               struct lfsck_component *com, bool new_checked)
2921 {
2922         struct lfsck_layout *lo = com->lc_file_ram;
2923
2924         down_write(&com->lc_sem);
2925         if (new_checked)
2926                 com->lc_new_checked++;
2927         lo->ll_objs_failed_phase1++;
2928         if (lo->ll_pos_first_inconsistent == 0) {
2929                 struct lfsck_instance *lfsck = com->lc_lfsck;
2930
2931                 lo->ll_pos_first_inconsistent =
2932                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
2933                                                         lfsck->li_di_oit);
2934         }
2935         up_write(&com->lc_sem);
2936 }
2937
2938 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
2939                                           struct lfsck_component *com, bool init)
2940 {
2941         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2942         struct lfsck_layout             *lo      = com->lc_file_ram;
2943         struct lfsck_layout_master_data *llmd    = com->lc_data;
2944         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2945         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
2946         struct l_wait_info               lwi     = { 0 };
2947         int                              rc;
2948
2949         if (com->lc_new_checked == 0 && !init)
2950                 return 0;
2951
2952         l_wait_event(mthread->t_ctl_waitq,
2953                      list_empty(&llmd->llmd_req_list) ||
2954                      !thread_is_running(mthread) ||
2955                      thread_is_stopped(athread),
2956                      &lwi);
2957
2958         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2959                 return 0;
2960
2961         down_write(&com->lc_sem);
2962         if (init) {
2963                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
2964         } else {
2965                 lo->ll_pos_last_checkpoint =
2966                                         lfsck->li_pos_current.lp_oit_cookie;
2967                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2968                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2969                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
2970                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
2971                 com->lc_new_checked = 0;
2972         }
2973
2974         rc = lfsck_layout_store(env, com);
2975         up_write(&com->lc_sem);
2976
2977         return rc;
2978 }
2979
2980 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
2981                                          struct lfsck_component *com, bool init)
2982 {
2983         struct lfsck_instance   *lfsck = com->lc_lfsck;
2984         struct lfsck_layout     *lo    = com->lc_file_ram;
2985         int                      rc;
2986
2987         if (com->lc_new_checked == 0 && !init)
2988                 return 0;
2989
2990         down_write(&com->lc_sem);
2991
2992         if (init) {
2993                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
2994         } else {
2995                 lo->ll_pos_last_checkpoint =
2996                                         lfsck->li_pos_current.lp_oit_cookie;
2997                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
2998                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2999                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3000                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3001                 com->lc_new_checked = 0;
3002         }
3003
3004         rc = lfsck_layout_store(env, com);
3005
3006         up_write(&com->lc_sem);
3007
3008         return rc;
3009 }
3010
3011 static int lfsck_layout_prep(const struct lu_env *env,
3012                              struct lfsck_component *com,
3013                              struct lfsck_start *start)
3014 {
3015         struct lfsck_instance   *lfsck  = com->lc_lfsck;
3016         struct lfsck_layout     *lo     = com->lc_file_ram;
3017         struct lfsck_position   *pos    = &com->lc_pos_start;
3018
3019         fid_zero(&pos->lp_dir_parent);
3020         pos->lp_dir_cookie = 0;
3021         if (lo->ll_status == LS_COMPLETED ||
3022             lo->ll_status == LS_PARTIAL ||
3023             /* To handle orphan, must scan from the beginning. */
3024             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
3025                 int rc;
3026
3027                 rc = lfsck_layout_reset(env, com, false);
3028                 if (rc != 0)
3029                         return rc;
3030         }
3031
3032         down_write(&com->lc_sem);
3033         lo->ll_time_latest_start = cfs_time_current_sec();
3034         spin_lock(&lfsck->li_lock);
3035         if (lo->ll_flags & LF_SCANNED_ONCE) {
3036                 if (!lfsck->li_drop_dryrun ||
3037                     lo->ll_pos_first_inconsistent == 0) {
3038                         lo->ll_status = LS_SCANNING_PHASE2;
3039                         list_del_init(&com->lc_link);
3040                         list_add_tail(&com->lc_link,
3041                                       &lfsck->li_list_double_scan);
3042                         pos->lp_oit_cookie = 0;
3043                 } else {
3044                         int i;
3045
3046                         lo->ll_status = LS_SCANNING_PHASE1;
3047                         lo->ll_run_time_phase1 = 0;
3048                         lo->ll_run_time_phase2 = 0;
3049                         lo->ll_objs_checked_phase1 = 0;
3050                         lo->ll_objs_checked_phase2 = 0;
3051                         lo->ll_objs_failed_phase1 = 0;
3052                         lo->ll_objs_failed_phase2 = 0;
3053                         for (i = 0; i < LLIT_MAX; i++)
3054                                 lo->ll_objs_repaired[i] = 0;
3055
3056                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
3057                         fid_zero(&com->lc_fid_latest_scanned_phase2);
3058                 }
3059         } else {
3060                 lo->ll_status = LS_SCANNING_PHASE1;
3061                 if (!lfsck->li_drop_dryrun ||
3062                     lo->ll_pos_first_inconsistent == 0)
3063                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
3064                 else
3065                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
3066         }
3067         spin_unlock(&lfsck->li_lock);
3068         up_write(&com->lc_sem);
3069
3070         return 0;
3071 }
3072
3073 static int lfsck_layout_slave_prep(const struct lu_env *env,
3074                                    struct lfsck_component *com,
3075                                    struct lfsck_start_param *lsp)
3076 {
3077         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
3078         struct lfsck_start              *start  = lsp->lsp_start;
3079         int                              rc;
3080
3081         rc = lfsck_layout_prep(env, com, start);
3082         if (rc != 0 || !lsp->lsp_index_valid)
3083                 return rc;
3084
3085         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
3086         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
3087                 LASSERT(!llsd->llsd_rbtree_valid);
3088
3089                 write_lock(&llsd->llsd_rb_lock);
3090                 rc = lfsck_rbtree_setup(env, com);
3091                 write_unlock(&llsd->llsd_rb_lock);
3092         }
3093
3094         return rc;
3095 }
3096
3097 static int lfsck_layout_master_prep(const struct lu_env *env,
3098                                     struct lfsck_component *com,
3099                                     struct lfsck_start_param *lsp)
3100 {
3101         struct lfsck_instance           *lfsck   = com->lc_lfsck;
3102         struct lfsck_layout_master_data *llmd    = com->lc_data;
3103         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3104         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3105         struct lfsck_thread_args        *lta;
3106         long                             rc;
3107         ENTRY;
3108
3109         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
3110         if (rc != 0)
3111                 RETURN(rc);
3112
3113         llmd->llmd_assistant_status = 0;
3114         llmd->llmd_post_result = 0;
3115         llmd->llmd_to_post = 0;
3116         llmd->llmd_to_double_scan = 0;
3117         llmd->llmd_in_double_scan = 0;
3118         llmd->llmd_exit = 0;
3119         thread_set_flags(athread, 0);
3120
3121         lta = lfsck_thread_args_init(lfsck, com, lsp);
3122         if (IS_ERR(lta))
3123                 RETURN(PTR_ERR(lta));
3124
3125         rc = PTR_ERR(kthread_run(lfsck_layout_assistant, lta, "lfsck_layout"));
3126         if (IS_ERR_VALUE(rc)) {
3127                 CERROR("%s: Cannot start LFSCK layout assistant thread: "
3128                        "rc = %ld\n", lfsck_lfsck2name(lfsck), rc);
3129                 lfsck_thread_args_fini(lta);
3130         } else {
3131                 struct l_wait_info lwi = { 0 };
3132
3133                 l_wait_event(mthread->t_ctl_waitq,
3134                              thread_is_running(athread) ||
3135                              thread_is_stopped(athread),
3136                              &lwi);
3137                 if (unlikely(!thread_is_running(athread)))
3138                         rc = llmd->llmd_assistant_status;
3139                 else
3140                         rc = 0;
3141         }
3142
3143         RETURN(rc);
3144 }
3145
3146 /* Pre-fetch the attribute for each stripe in the given layout EA. */
3147 static int lfsck_layout_scan_stripes(const struct lu_env *env,
3148                                      struct lfsck_component *com,
3149                                      struct dt_object *parent,
3150                                      struct lov_mds_md_v1 *lmm)
3151 {
3152         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3153         struct lfsck_instance           *lfsck   = com->lc_lfsck;
3154         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3155         struct lfsck_layout             *lo      = com->lc_file_ram;
3156         struct lfsck_layout_master_data *llmd    = com->lc_data;
3157         struct lfsck_layout_object      *llo     = NULL;
3158         struct lov_ost_data_v1          *objs;
3159         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
3160         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3161         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3162                 struct l_wait_info       lwi     = { 0 };
3163         struct lu_buf                   *buf;
3164         int                              rc      = 0;
3165         int                              i;
3166         __u32                            magic;
3167         __u16                            count;
3168         __u16                            gen;
3169         ENTRY;
3170
3171         buf = lfsck_buf_get(env, &info->lti_old_pfid,
3172                             sizeof(struct filter_fid_old));
3173         count = le16_to_cpu(lmm->lmm_stripe_count);
3174         gen = le16_to_cpu(lmm->lmm_layout_gen);
3175         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3176          * been verified in lfsck_layout_verify_header() already. If some
3177          * new magic introduced in the future, then layout LFSCK needs to
3178          * be updated also. */
3179         magic = le32_to_cpu(lmm->lmm_magic);
3180         if (magic == LOV_MAGIC_V1) {
3181                 objs = &(lmm->lmm_objects[0]);
3182         } else {
3183                 LASSERT(magic == LOV_MAGIC_V3);
3184                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3185         }
3186
3187         for (i = 0; i < count; i++, objs++) {
3188                 struct lu_fid           *fid    = &info->lti_fid;
3189                 struct ost_id           *oi     = &info->lti_oi;
3190                 struct lfsck_layout_req *llr;
3191                 struct lfsck_tgt_desc   *tgt    = NULL;
3192                 struct dt_object        *cobj   = NULL;
3193                 __u32                    index  =
3194                                         le32_to_cpu(objs->l_ost_idx);
3195                 bool                     wakeup = false;
3196
3197                 l_wait_event(mthread->t_ctl_waitq,
3198                              bk->lb_async_windows == 0 ||
3199                              llmd->llmd_prefetched < bk->lb_async_windows ||
3200                              !thread_is_running(mthread) ||
3201                              thread_is_stopped(athread),
3202                              &lwi);
3203
3204                 if (unlikely(!thread_is_running(mthread)) ||
3205                              thread_is_stopped(athread))
3206                         GOTO(out, rc = 0);
3207
3208                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3209                 ostid_to_fid(fid, oi, index);
3210                 tgt = lfsck_tgt_get(ltds, index);
3211                 if (unlikely(tgt == NULL)) {
3212                         CERROR("%s: Cannot talk with OST %x which did not join "
3213                                "the layout LFSCK.\n",
3214                                lfsck_lfsck2name(lfsck), index);
3215                         lo->ll_flags |= LF_INCOMPLETE;
3216                         goto next;
3217                 }
3218
3219                 cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
3220                 if (IS_ERR(cobj)) {
3221                         rc = PTR_ERR(cobj);
3222                         goto next;
3223                 }
3224
3225                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
3226                 if (rc != 0)
3227                         goto next;
3228
3229                 rc = dt_declare_xattr_get(env, cobj, buf, XATTR_NAME_FID,
3230                                           BYPASS_CAPA);
3231                 if (rc != 0)
3232                         goto next;
3233
3234                 if (llo == NULL) {
3235                         llo = lfsck_layout_object_init(env, parent, gen);
3236                         if (IS_ERR(llo)) {
3237                                 rc = PTR_ERR(llo);
3238                                 goto next;
3239                         }
3240                 }
3241
3242                 llr = lfsck_layout_req_init(llo, cobj, index, i);
3243                 if (IS_ERR(llr)) {
3244                         rc = PTR_ERR(llr);
3245                         goto next;
3246                 }
3247
3248                 cobj = NULL;
3249                 spin_lock(&llmd->llmd_lock);
3250                 if (llmd->llmd_assistant_status < 0) {
3251                         spin_unlock(&llmd->llmd_lock);
3252                         lfsck_layout_req_fini(env, llr);
3253                         lfsck_tgt_put(tgt);
3254                         RETURN(llmd->llmd_assistant_status);
3255                 }
3256
3257                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
3258                 if (llmd->llmd_prefetched == 0)
3259                         wakeup = true;
3260
3261                 llmd->llmd_prefetched++;
3262                 spin_unlock(&llmd->llmd_lock);
3263                 if (wakeup)
3264                         wake_up_all(&athread->t_ctl_waitq);
3265
3266 next:
3267                 down_write(&com->lc_sem);
3268                 com->lc_new_checked++;
3269                 if (rc < 0)
3270                         lo->ll_objs_failed_phase1++;
3271                 up_write(&com->lc_sem);
3272
3273                 if (cobj != NULL && !IS_ERR(cobj))
3274                         lu_object_put(env, &cobj->do_lu);
3275
3276                 if (likely(tgt != NULL))
3277                         lfsck_tgt_put(tgt);
3278
3279                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3280                         GOTO(out, rc);
3281         }
3282
3283         GOTO(out, rc = 0);
3284
3285 out:
3286         if (llo != NULL && !IS_ERR(llo))
3287                 lfsck_layout_object_put(env, llo);
3288
3289         return rc;
3290 }
3291
3292 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
3293  * the OST-object's attribute and generate an structure lfsck_layout_req on the
3294  * list ::llmd_req_list.
3295  *
3296  * For each request on above list, the lfsck_layout_assistant thread compares
3297  * the OST side attribute with local attribute, if inconsistent, then repair it.
3298  *
3299  * All above processing is async mode with pipeline. */
3300 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
3301                                         struct lfsck_component *com,
3302                                         struct dt_object *obj)
3303 {
3304         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3305         struct ost_id                   *oi     = &info->lti_oi;
3306         struct lfsck_layout             *lo     = com->lc_file_ram;
3307         struct lfsck_layout_master_data *llmd   = com->lc_data;
3308         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3309         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
3310         struct thandle                  *handle = NULL;
3311         struct lu_buf                   *buf    = &info->lti_big_buf;
3312         struct lov_mds_md_v1            *lmm    = NULL;
3313         struct dt_device                *dev    = lfsck->li_bottom;
3314         struct lustre_handle             lh     = { 0 };
3315         ssize_t                          buflen = buf->lb_len;
3316         int                              rc     = 0;
3317         bool                             locked = false;
3318         bool                             stripe = false;
3319         ENTRY;
3320
3321         if (!S_ISREG(lfsck_object_type(obj)))
3322                 GOTO(out, rc = 0);
3323
3324         if (llmd->llmd_assistant_status < 0)
3325                 GOTO(out, rc = -ESRCH);
3326
3327         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
3328         lmm_oi_cpu_to_le(oi, oi);
3329         dt_read_lock(env, obj, 0);
3330         locked = true;
3331
3332 again:
3333         rc = lfsck_layout_get_lovea(env, obj, buf, &buflen);
3334         if (rc <= 0)
3335                 GOTO(out, rc);
3336
3337         buf->lb_len = rc;
3338         lmm = buf->lb_buf;
3339         rc = lfsck_layout_verify_header(lmm);
3340         if (rc != 0)
3341                 GOTO(out, rc);
3342
3343         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
3344                 GOTO(out, stripe = true);
3345
3346         /* Inconsistent lmm_oi, should be repaired. */
3347         CDEBUG(D_LFSCK, "Repair bad lmm_oi for "DFID"\n",
3348                PFID(lfsck_dto2fid(obj)));
3349
3350         if (bk->lb_param & LPF_DRYRUN) {
3351                 down_write(&com->lc_sem);
3352                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
3353                 up_write(&com->lc_sem);
3354
3355                 GOTO(out, stripe = true);
3356         }
3357
3358         if (!lustre_handle_is_used(&lh)) {
3359                 dt_read_unlock(env, obj);
3360                 locked = false;
3361                 buf->lb_len = buflen;
3362                 rc = lfsck_layout_lock(env, com, obj, &lh,
3363                                        MDS_INODELOCK_LAYOUT |
3364                                        MDS_INODELOCK_XATTR);
3365                 if (rc != 0)
3366                         GOTO(out, rc);
3367
3368                 handle = dt_trans_create(env, dev);
3369                 if (IS_ERR(handle))
3370                         GOTO(out, rc = PTR_ERR(handle));
3371
3372                 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
3373                                           LU_XATTR_REPLACE, handle);
3374                 if (rc != 0)
3375                         GOTO(out, rc);
3376
3377                 rc = dt_trans_start_local(env, dev, handle);
3378                 if (rc != 0)
3379                         GOTO(out, rc);
3380
3381                 dt_write_lock(env, obj, 0);
3382                 locked = true;
3383
3384                 goto again;
3385         }
3386
3387         lmm->lmm_oi = *oi;
3388         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LOV,
3389                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
3390         if (rc != 0)
3391                 GOTO(out, rc);
3392
3393         down_write(&com->lc_sem);
3394         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
3395         up_write(&com->lc_sem);
3396
3397         GOTO(out, stripe = true);
3398
3399 out:
3400         if (locked) {
3401                 if (lustre_handle_is_used(&lh))
3402                         dt_write_unlock(env, obj);
3403                 else
3404                         dt_read_unlock(env, obj);
3405         }
3406
3407         if (handle != NULL && !IS_ERR(handle))
3408                 dt_trans_stop(env, dev, handle);
3409
3410         lfsck_layout_unlock(&lh);
3411         if (stripe) {
3412                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
3413         } else {
3414                 down_write(&com->lc_sem);
3415                 com->lc_new_checked++;
3416                 if (rc < 0)
3417                         lo->ll_objs_failed_phase1++;
3418                 up_write(&com->lc_sem);
3419         }
3420         buf->lb_len = buflen;
3421
3422         return rc;
3423 }
3424
3425 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
3426                                        struct lfsck_component *com,
3427                                        struct dt_object *obj)
3428 {
3429         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3430         struct lfsck_layout             *lo     = com->lc_file_ram;
3431         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
3432         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
3433         struct lfsck_layout_seq         *lls;
3434         __u64                            seq;
3435         __u64                            oid;
3436         int                              rc;
3437         ENTRY;
3438
3439         LASSERT(llsd != NULL);
3440
3441         lfsck_rbtree_update_bitmap(env, com, fid, false);
3442
3443         down_write(&com->lc_sem);
3444         if (fid_is_idif(fid))
3445                 seq = 0;
3446         else if (!fid_is_norm(fid) ||
3447                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
3448                 GOTO(unlock, rc = 0);
3449         else
3450                 seq = fid_seq(fid);
3451         com->lc_new_checked++;
3452
3453         lls = lfsck_layout_seq_lookup(llsd, seq);
3454         if (lls == NULL) {
3455                 OBD_ALLOC_PTR(lls);
3456                 if (unlikely(lls == NULL))
3457                         GOTO(unlock, rc = -ENOMEM);
3458
3459                 INIT_LIST_HEAD(&lls->lls_list);
3460                 lls->lls_seq = seq;
3461                 rc = lfsck_layout_lastid_load(env, com, lls);
3462                 if (rc != 0) {
3463                         lo->ll_objs_failed_phase1++;
3464                         OBD_FREE_PTR(lls);
3465                         GOTO(unlock, rc);
3466                 }
3467
3468                 lfsck_layout_seq_insert(llsd, lls);
3469         }
3470
3471         if (unlikely(fid_is_last_id(fid)))
3472                 GOTO(unlock, rc = 0);
3473
3474         oid = fid_oid(fid);
3475         if (oid > lls->lls_lastid_known)
3476                 lls->lls_lastid_known = oid;
3477
3478         if (oid > lls->lls_lastid) {
3479                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
3480                         /* OFD may create new objects during LFSCK scanning. */
3481                         rc = lfsck_layout_lastid_reload(env, com, lls);
3482                         if (unlikely(rc != 0))
3483                                 CWARN("%s: failed to reload LAST_ID for "LPX64
3484                                       ": rc = %d\n",
3485                                       lfsck_lfsck2name(com->lc_lfsck),
3486                                       lls->lls_seq, rc);
3487                         if (oid <= lls->lls_lastid)
3488                                 GOTO(unlock, rc = 0);
3489
3490                         LASSERT(lfsck->li_out_notify != NULL);
3491
3492                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
3493                                              LE_LASTID_REBUILDING);
3494                         lo->ll_flags |= LF_CRASHED_LASTID;
3495                 }
3496
3497                 lls->lls_lastid = oid;
3498                 lls->lls_dirty = 1;
3499         }
3500
3501         GOTO(unlock, rc = 0);
3502
3503 unlock:
3504         up_write(&com->lc_sem);
3505
3506         return rc;
3507 }
3508
3509 static int lfsck_layout_exec_dir(const struct lu_env *env,
3510                                  struct lfsck_component *com,
3511                                  struct dt_object *obj,
3512                                  struct lu_dirent *ent)
3513 {
3514         return 0;
3515 }
3516
3517 static int lfsck_layout_master_post(const struct lu_env *env,
3518                                     struct lfsck_component *com,
3519                                     int result, bool init)
3520 {
3521         struct lfsck_instance           *lfsck   = com->lc_lfsck;
3522         struct lfsck_layout             *lo      = com->lc_file_ram;
3523         struct lfsck_layout_master_data *llmd    = com->lc_data;
3524         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3525         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3526         struct l_wait_info               lwi     = { 0 };
3527         int                              rc;
3528         ENTRY;
3529
3530
3531         llmd->llmd_post_result = result;
3532         llmd->llmd_to_post = 1;
3533         if (llmd->llmd_post_result <= 0)
3534                 llmd->llmd_exit = 1;
3535
3536         wake_up_all(&athread->t_ctl_waitq);
3537         l_wait_event(mthread->t_ctl_waitq,
3538                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
3539                      thread_is_stopped(athread),
3540                      &lwi);
3541
3542         if (llmd->llmd_assistant_status < 0)
3543                 result = llmd->llmd_assistant_status;
3544
3545         down_write(&com->lc_sem);
3546         spin_lock(&lfsck->li_lock);
3547         /* When LFSCK failed, there may be some prefetched objects those are
3548          * not been processed yet, we do not know the exactly position, then
3549          * just restart from last check-point next time. */
3550         if (!init && !llmd->llmd_exit)
3551                 lo->ll_pos_last_checkpoint =
3552                                         lfsck->li_pos_current.lp_oit_cookie;
3553
3554         if (result > 0) {
3555                 lo->ll_status = LS_SCANNING_PHASE2;
3556                 lo->ll_flags |= LF_SCANNED_ONCE;
3557                 lo->ll_flags &= ~LF_UPGRADE;
3558                 list_del_init(&com->lc_link);
3559                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
3560         } else if (result == 0) {
3561                 lo->ll_status = lfsck->li_status;
3562                 if (lo->ll_status == 0)
3563                         lo->ll_status = LS_STOPPED;
3564                 if (lo->ll_status != LS_PAUSED) {
3565                         list_del_init(&com->lc_link);
3566                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
3567                 }
3568         } else {
3569                 lo->ll_status = LS_FAILED;
3570                 list_del_init(&com->lc_link);
3571                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
3572         }
3573         spin_unlock(&lfsck->li_lock);
3574
3575         if (!init) {
3576                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3577                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3578                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3579                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3580                 com->lc_new_checked = 0;
3581         }
3582
3583         rc = lfsck_layout_store(env, com);
3584         up_write(&com->lc_sem);
3585
3586         RETURN(rc);
3587 }
3588
3589 static int lfsck_layout_slave_post(const struct lu_env *env,
3590                                    struct lfsck_component *com,
3591                                    int result, bool init)
3592 {
3593         struct lfsck_instance   *lfsck = com->lc_lfsck;
3594         struct lfsck_layout     *lo    = com->lc_file_ram;
3595         int                      rc;
3596         bool                     done  = false;
3597
3598         rc = lfsck_layout_lastid_store(env, com);
3599         if (rc != 0)
3600                 result = rc;
3601
3602         LASSERT(lfsck->li_out_notify != NULL);
3603
3604         down_write(&com->lc_sem);
3605
3606         spin_lock(&lfsck->li_lock);
3607         if (!init)
3608                 lo->ll_pos_last_checkpoint =
3609                                         lfsck->li_pos_current.lp_oit_cookie;
3610         if (result > 0) {
3611                 lo->ll_status = LS_SCANNING_PHASE2;
3612                 lo->ll_flags |= LF_SCANNED_ONCE;
3613                 if (lo->ll_flags & LF_CRASHED_LASTID) {
3614                         done = true;
3615                         lo->ll_flags &= ~LF_CRASHED_LASTID;
3616                 }
3617                 lo->ll_flags &= ~LF_UPGRADE;
3618                 list_del_init(&com->lc_link);
3619                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
3620         } else if (result == 0) {
3621                 lo->ll_status = lfsck->li_status;
3622                 if (lo->ll_status == 0)
3623                         lo->ll_status = LS_STOPPED;
3624                 if (lo->ll_status != LS_PAUSED) {
3625                         list_del_init(&com->lc_link);
3626                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
3627                 }
3628         } else {
3629                 lo->ll_status = LS_FAILED;
3630                 list_del_init(&com->lc_link);
3631                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
3632         }
3633         spin_unlock(&lfsck->li_lock);
3634
3635         if (done)
3636                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
3637                                      LE_LASTID_REBUILT);
3638
3639         if (!init) {
3640                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
3641                                 HALF_SEC - lfsck->li_time_last_checkpoint);
3642                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
3643                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
3644                 com->lc_new_checked = 0;
3645         }
3646
3647         rc = lfsck_layout_store(env, com);
3648
3649         up_write(&com->lc_sem);
3650
3651         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
3652
3653         if (result <= 0)
3654                 lfsck_rbtree_cleanup(env, com);
3655
3656         return rc;
3657 }
3658
3659 static int lfsck_layout_dump(const struct lu_env *env,
3660                              struct lfsck_component *com, char *buf, int len)
3661 {
3662         struct lfsck_instance   *lfsck = com->lc_lfsck;
3663         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
3664         struct lfsck_layout     *lo    = com->lc_file_ram;
3665         int                      save  = len;
3666         int                      ret   = -ENOSPC;
3667         int                      rc;
3668
3669         down_read(&com->lc_sem);
3670         rc = snprintf(buf, len,
3671                       "name: lfsck_layout\n"
3672                       "magic: %#x\n"
3673                       "version: %d\n"
3674                       "status: %s\n",
3675                       lo->ll_magic,
3676                       bk->lb_version,
3677                       lfsck_status2names(lo->ll_status));
3678         if (rc <= 0)
3679                 goto out;
3680
3681         buf += rc;
3682         len -= rc;
3683         rc = lfsck_bits_dump(&buf, &len, lo->ll_flags, lfsck_flags_names,
3684                              "flags");
3685         if (rc < 0)
3686                 goto out;
3687
3688         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
3689                              "param");
3690         if (rc < 0)
3691                 goto out;
3692
3693         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_complete,
3694                              "time_since_last_completed");
3695         if (rc < 0)
3696                 goto out;
3697
3698         rc = lfsck_time_dump(&buf, &len, lo->ll_time_latest_start,
3699                              "time_since_latest_start");
3700         if (rc < 0)
3701                 goto out;
3702
3703         rc = lfsck_time_dump(&buf, &len, lo->ll_time_last_checkpoint,
3704                              "time_since_last_checkpoint");
3705         if (rc < 0)
3706                 goto out;
3707
3708         rc = snprintf(buf, len,
3709                       "latest_start_position: "LPU64"\n"
3710                       "last_checkpoint_position: "LPU64"\n"
3711                       "first_failure_position: "LPU64"\n",
3712                       lo->ll_pos_latest_start,
3713                       lo->ll_pos_last_checkpoint,
3714                       lo->ll_pos_first_inconsistent);
3715         if (rc <= 0)
3716                 goto out;
3717
3718         buf += rc;
3719         len -= rc;
3720
3721         rc = snprintf(buf, len,
3722                       "success_count: %u\n"
3723                       "repaired_dangling: "LPU64"\n"
3724                       "repaired_unmatched_pair: "LPU64"\n"
3725                       "repaired_multiple_referenced: "LPU64"\n"
3726                       "repaired_orphan: "LPU64"\n"
3727                       "repaired_inconsistent_owner: "LPU64"\n"
3728                       "repaired_others: "LPU64"\n"
3729                       "skipped: "LPU64"\n"
3730                       "failed_phase1: "LPU64"\n"
3731                       "failed_phase2: "LPU64"\n",
3732                       lo->ll_success_count,
3733                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
3734                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
3735                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
3736                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
3737                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
3738                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
3739                       lo->ll_objs_skipped,
3740                       lo->ll_objs_failed_phase1,
3741                       lo->ll_objs_failed_phase2);
3742         if (rc <= 0)
3743                 goto out;
3744
3745         buf += rc;
3746         len -= rc;
3747
3748         if (lo->ll_status == LS_SCANNING_PHASE1) {
3749                 __u64 pos;
3750                 const struct dt_it_ops *iops;
3751                 cfs_duration_t duration = cfs_time_current() -
3752                                           lfsck->li_time_last_checkpoint;
3753                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
3754                 __u64 speed = checked;
3755                 __u64 new_checked = com->lc_new_checked * HZ;
3756                 __u32 rtime = lo->ll_run_time_phase1 +
3757                               cfs_duration_sec(duration + HALF_SEC);
3758
3759                 if (duration != 0)
3760                         do_div(new_checked, duration);
3761                 if (rtime != 0)
3762                         do_div(speed, rtime);
3763                 rc = snprintf(buf, len,
3764                               "checked_phase1: "LPU64"\n"
3765                               "checked_phase2: "LPU64"\n"
3766                               "run_time_phase1: %u seconds\n"
3767                               "run_time_phase2: %u seconds\n"
3768                               "average_speed_phase1: "LPU64" items/sec\n"
3769                               "average_speed_phase2: N/A\n"
3770                               "real-time_speed_phase1: "LPU64" items/sec\n"
3771                               "real-time_speed_phase2: N/A\n",
3772                               checked,
3773                               lo->ll_objs_checked_phase2,
3774                               rtime,
3775                               lo->ll_run_time_phase2,
3776                               speed,
3777                               new_checked);
3778                 if (rc <= 0)
3779                         goto out;
3780
3781                 buf += rc;
3782                 len -= rc;
3783
3784                 LASSERT(lfsck->li_di_oit != NULL);
3785
3786                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
3787
3788                 /* The low layer otable-based iteration position may NOT
3789                  * exactly match the layout-based directory traversal
3790                  * cookie. Generally, it is not a serious issue. But the
3791                  * caller should NOT make assumption on that. */
3792                 pos = iops->store(env, lfsck->li_di_oit);
3793                 if (!lfsck->li_current_oit_processed)
3794                         pos--;
3795                 rc = snprintf(buf, len, "current_position: "LPU64"\n", pos);
3796                 if (rc <= 0)
3797                         goto out;
3798
3799                 buf += rc;
3800                 len -= rc;
3801         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
3802                 cfs_duration_t duration = cfs_time_current() -
3803                                           lfsck->li_time_last_checkpoint;
3804                 __u64 checked = lo->ll_objs_checked_phase1 + com->lc_new_checked;
3805                 __u64 speed = checked;
3806                 __u64 new_checked = com->lc_new_checked * HZ;
3807                 __u32 rtime = lo->ll_run_time_phase1 +
3808                               cfs_duration_sec(duration + HALF_SEC);
3809
3810                 if (duration != 0)
3811                         do_div(new_checked, duration);
3812                 if (rtime != 0)
3813                         do_div(speed, rtime);
3814                 rc = snprintf(buf, len,
3815                               "checked_phase1: "LPU64"\n"
3816                               "checked_phase2: "LPU64"\n"
3817                               "run_time_phase1: %u seconds\n"
3818                               "run_time_phase2: %u seconds\n"
3819                               "average_speed_phase1: "LPU64" items/sec\n"
3820                               "average_speed_phase2: N/A\n"
3821                               "real-time_speed_phase1: "LPU64" items/sec\n"
3822                               "real-time_speed_phase2: N/A\n"
3823                               "current_position: "DFID"\n",
3824                               checked,
3825                               lo->ll_objs_checked_phase2,
3826                               rtime,
3827                               lo->ll_run_time_phase2,
3828                               speed,
3829                               new_checked,
3830                               PFID(&com->lc_fid_latest_scanned_phase2));
3831                 if (rc <= 0)
3832                         goto out;
3833
3834                 buf += rc;
3835                 len -= rc;
3836         } else {
3837                 __u64 speed1 = lo->ll_objs_checked_phase1;
3838                 __u64 speed2 = lo->ll_objs_checked_phase2;
3839
3840                 if (lo->ll_run_time_phase1 != 0)
3841                         do_div(speed1, lo->ll_run_time_phase1);
3842                 if (lo->ll_run_time_phase2 != 0)
3843                         do_div(speed2, lo->ll_run_time_phase2);
3844                 rc = snprintf(buf, len,
3845                               "checked_phase1: "LPU64"\n"
3846                               "checked_phase2: "LPU64"\n"
3847                               "run_time_phase1: %u seconds\n"
3848                               "run_time_phase2: %u seconds\n"
3849                               "average_speed_phase1: "LPU64" items/sec\n"
3850                               "average_speed_phase2: "LPU64" objs/sec\n"
3851                               "real-time_speed_phase1: N/A\n"
3852                               "real-time_speed_phase2: N/A\n"
3853                               "current_position: N/A\n",
3854                               lo->ll_objs_checked_phase1,
3855                               lo->ll_objs_checked_phase2,
3856                               lo->ll_run_time_phase1,
3857                               lo->ll_run_time_phase2,
3858                               speed1,
3859                               speed2);
3860                 if (rc <= 0)
3861                         goto out;
3862
3863                 buf += rc;
3864                 len -= rc;
3865         }
3866         ret = save - len;
3867
3868 out:
3869         up_read(&com->lc_sem);
3870
3871         return ret;
3872 }
3873
3874 static int lfsck_layout_master_double_scan(const struct lu_env *env,
3875                                            struct lfsck_component *com)
3876 {
3877         struct lfsck_layout_master_data *llmd    = com->lc_data;
3878         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
3879         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3880         struct lfsck_layout             *lo      = com->lc_file_ram;
3881         struct l_wait_info               lwi     = { 0 };
3882
3883         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
3884                 return 0;
3885
3886         llmd->llmd_to_double_scan = 1;
3887         wake_up_all(&athread->t_ctl_waitq);
3888         l_wait_event(mthread->t_ctl_waitq,
3889                      llmd->llmd_in_double_scan ||
3890                      thread_is_stopped(athread),
3891                      &lwi);
3892         if (llmd->llmd_assistant_status < 0)
3893                 return llmd->llmd_assistant_status;
3894
3895         return 0;
3896 }
3897
3898 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
3899                                           struct lfsck_component *com)
3900 {
3901         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3902         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
3903         struct lfsck_layout             *lo     = com->lc_file_ram;
3904         struct ptlrpc_thread            *thread = &lfsck->li_thread;
3905         int                              rc;
3906         ENTRY;
3907
3908         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
3909                 lfsck_rbtree_cleanup(env, com);
3910                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
3911                 RETURN(0);
3912         }
3913
3914         atomic_inc(&lfsck->li_double_scan_count);
3915
3916         com->lc_new_checked = 0;
3917         com->lc_new_scanned = 0;
3918         com->lc_time_last_checkpoint = cfs_time_current();
3919         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
3920                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3921
3922         while (1) {
3923                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
3924                                                      NULL, NULL);
3925
3926                 rc = lfsck_layout_slave_query_master(env, com);
3927                 if (list_empty(&llsd->llsd_master_list)) {
3928                         if (unlikely(!thread_is_running(thread)))
3929                                 rc = 0;
3930                         else
3931                                 rc = 1;
3932
3933                         GOTO(done, rc);
3934                 }
3935
3936                 if (rc < 0)
3937                         GOTO(done, rc);
3938
3939                 rc = l_wait_event(thread->t_ctl_waitq,
3940                                   !thread_is_running(thread) ||
3941                                   list_empty(&llsd->llsd_master_list),
3942                                   &lwi);
3943                 if (unlikely(!thread_is_running(thread)))
3944                         GOTO(done, rc = 0);
3945
3946                 if (rc == -ETIMEDOUT)
3947                         continue;
3948
3949                 GOTO(done, rc = (rc < 0 ? rc : 1));
3950         }
3951
3952 done:
3953         rc = lfsck_layout_double_scan_result(env, com, rc);
3954
3955         lfsck_rbtree_cleanup(env, com);
3956         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
3957         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
3958                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
3959
3960         return rc;
3961 }
3962
3963 static void lfsck_layout_master_data_release(const struct lu_env *env,
3964                                              struct lfsck_component *com)
3965 {
3966         struct lfsck_layout_master_data *llmd   = com->lc_data;
3967         struct lfsck_instance           *lfsck  = com->lc_lfsck;
3968         struct lfsck_tgt_descs          *ltds;
3969         struct lfsck_tgt_desc           *ltd;
3970         struct lfsck_tgt_desc           *next;
3971
3972         LASSERT(llmd != NULL);
3973         LASSERT(thread_is_init(&llmd->llmd_thread) ||
3974                 thread_is_stopped(&llmd->llmd_thread));
3975         LASSERT(list_empty(&llmd->llmd_req_list));
3976
3977         com->lc_data = NULL;
3978
3979         ltds = &lfsck->li_ost_descs;
3980         spin_lock(&ltds->ltd_lock);
3981         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
3982                                  ltd_layout_phase_list) {
3983                 list_del_init(&ltd->ltd_layout_phase_list);
3984         }
3985         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
3986                                  ltd_layout_phase_list) {
3987                 list_del_init(&ltd->ltd_layout_phase_list);
3988         }
3989         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
3990                                  ltd_layout_list) {
3991                 list_del_init(&ltd->ltd_layout_list);
3992         }
3993         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
3994                                  ltd_layout_phase_list) {
3995                 list_del_init(&ltd->ltd_layout_phase_list);
3996         }
3997         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
3998                                  ltd_layout_phase_list) {
3999                 list_del_init(&ltd->ltd_layout_phase_list);
4000         }
4001         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
4002                                  ltd_layout_list) {
4003                 list_del_init(&ltd->ltd_layout_list);
4004         }
4005         spin_unlock(&ltds->ltd_lock);
4006
4007         OBD_FREE_PTR(llmd);
4008 }
4009
4010 static void lfsck_layout_slave_data_release(const struct lu_env *env,
4011                                             struct lfsck_component *com)
4012 {
4013         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4014         struct lfsck_layout_seq          *lls;
4015         struct lfsck_layout_seq          *next;
4016         struct lfsck_layout_slave_target *llst;
4017         struct lfsck_layout_slave_target *tmp;
4018
4019         LASSERT(llsd != NULL);
4020
4021         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
4022                                      lls_list) {
4023                 list_del_init(&lls->lls_list);
4024                 lfsck_object_put(env, lls->lls_lastid_obj);
4025                 OBD_FREE_PTR(lls);
4026         }
4027
4028         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
4029                                  llst_list) {
4030                 list_del_init(&llst->llst_list);
4031                 OBD_FREE_PTR(llst);
4032         }
4033
4034         lfsck_rbtree_cleanup(env, com);
4035         com->lc_data = NULL;
4036         OBD_FREE_PTR(llsd);
4037 }
4038
4039 static void lfsck_layout_master_quit(const struct lu_env *env,
4040                                      struct lfsck_component *com)
4041 {
4042         struct lfsck_layout_master_data *llmd    = com->lc_data;
4043         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
4044         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4045         struct l_wait_info               lwi     = { 0 };
4046
4047         llmd->llmd_exit = 1;
4048         wake_up_all(&athread->t_ctl_waitq);
4049         l_wait_event(mthread->t_ctl_waitq,
4050                      thread_is_init(athread) ||
4051                      thread_is_stopped(athread),
4052                      &lwi);
4053 }
4054
4055 static void lfsck_layout_slave_quit(const struct lu_env *env,
4056                                     struct lfsck_component *com)
4057 {
4058         lfsck_rbtree_cleanup(env, com);
4059 }
4060
4061 static int lfsck_layout_master_in_notify(const struct lu_env *env,
4062                                          struct lfsck_component *com,
4063                                          struct lfsck_request *lr)
4064 {
4065         struct lfsck_instance           *lfsck = com->lc_lfsck;
4066         struct lfsck_layout             *lo    = com->lc_file_ram;
4067         struct lfsck_layout_master_data *llmd  = com->lc_data;
4068         struct lfsck_tgt_descs          *ltds;
4069         struct lfsck_tgt_desc           *ltd;
4070         bool                             fail  = false;
4071         ENTRY;
4072
4073         if (lr->lr_event != LE_PHASE1_DONE &&
4074             lr->lr_event != LE_PHASE2_DONE &&
4075             lr->lr_event != LE_PEER_EXIT)
4076                 RETURN(-EINVAL);
4077
4078         if (lr->lr_flags & LEF_FROM_OST)
4079                 ltds = &lfsck->li_ost_descs;
4080         else
4081                 ltds = &lfsck->li_mdt_descs;
4082         spin_lock(&ltds->ltd_lock);
4083         ltd = LTD_TGT(ltds, lr->lr_index);
4084         if (ltd == NULL) {
4085                 spin_unlock(&ltds->ltd_lock);
4086
4087                 RETURN(-ENODEV);
4088         }
4089
4090         list_del_init(&ltd->ltd_layout_phase_list);
4091         switch (lr->lr_event) {
4092         case LE_PHASE1_DONE:
4093                 if (lr->lr_status <= 0) {
4094                         ltd->ltd_layout_done = 1;
4095                         list_del_init(&ltd->ltd_layout_list);
4096                         CWARN("%s: %s %x failed/stopped at phase1: rc = %d.\n",
4097                               lfsck_lfsck2name(lfsck),
4098                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
4099                               ltd->ltd_index, lr->lr_status);
4100                         lo->ll_flags |= LF_INCOMPLETE;
4101                         fail = true;
4102                         break;
4103                 }
4104
4105                 if (lr->lr_flags & LEF_FROM_OST) {
4106                         if (list_empty(&ltd->ltd_layout_list))
4107                                 list_add_tail(&ltd->ltd_layout_list,
4108                                               &llmd->llmd_ost_list);
4109                         list_add_tail(&ltd->ltd_layout_phase_list,
4110                                       &llmd->llmd_ost_phase2_list);
4111                 } else {
4112                         if (list_empty(&ltd->ltd_layout_list))
4113                                 list_add_tail(&ltd->ltd_layout_list,
4114                                               &llmd->llmd_mdt_list);
4115                         list_add_tail(&ltd->ltd_layout_phase_list,
4116                                       &llmd->llmd_mdt_phase2_list);
4117                 }
4118                 break;
4119         case LE_PHASE2_DONE:
4120                 ltd->ltd_layout_done = 1;
4121                 list_del_init(&ltd->ltd_layout_list);
4122                 break;
4123         case LE_PEER_EXIT:
4124                 fail = true;
4125                 ltd->ltd_layout_done = 1;
4126                 list_del_init(&ltd->ltd_layout_list);
4127                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
4128                         CWARN("%s: the peer %s %x exit layout LFSCK.\n",
4129                               lfsck_lfsck2name(lfsck),
4130                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
4131                               ltd->ltd_index);
4132                         lo->ll_flags |= LF_INCOMPLETE;
4133                 }
4134                 break;
4135         default:
4136                 break;
4137         }
4138         spin_unlock(&ltds->ltd_lock);
4139
4140         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
4141                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4142
4143                 memset(stop, 0, sizeof(*stop));
4144                 stop->ls_status = lr->lr_status;
4145                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4146                 lfsck_stop(env, lfsck->li_bottom, stop);
4147         } else if (lfsck_layout_master_to_orphan(llmd)) {
4148                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
4149         }
4150
4151         RETURN(0);
4152 }
4153
4154 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
4155                                         struct lfsck_component *com,
4156                                         struct lfsck_request *lr)
4157 {
4158         struct lfsck_instance            *lfsck = com->lc_lfsck;
4159         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4160         struct lfsck_layout_slave_target *llst;
4161         ENTRY;
4162
4163         if (lr->lr_event == LE_FID_ACCESSED) {
4164                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
4165
4166                 RETURN(0);
4167         }
4168
4169         if (lr->lr_event != LE_PHASE2_DONE && lr->lr_event != LE_PEER_EXIT)
4170                 RETURN(-EINVAL);
4171
4172         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
4173         if (llst == NULL)
4174                 RETURN(-ENODEV);
4175
4176         lfsck_layout_llst_put(llst);
4177         if (list_empty(&llsd->llsd_master_list))
4178                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
4179
4180         if (lr->lr_event == LE_PEER_EXIT &&
4181             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
4182                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
4183
4184                 memset(stop, 0, sizeof(*stop));
4185                 stop->ls_status = lr->lr_status;
4186                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
4187                 lfsck_stop(env, lfsck->li_bottom, stop);
4188         }
4189
4190         RETURN(0);
4191 }
4192
4193 static int lfsck_layout_query(const struct lu_env *env,
4194                               struct lfsck_component *com)
4195 {
4196         struct lfsck_layout *lo = com->lc_file_ram;
4197
4198         return lo->ll_status;
4199 }
4200
4201 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
4202                                            struct lfsck_component *com,
4203                                            struct lfsck_tgt_descs *ltds,
4204                                            struct lfsck_tgt_desc *ltd,
4205                                            struct ptlrpc_request_set *set)
4206 {
4207         struct lfsck_thread_info          *info  = lfsck_env_info(env);
4208         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
4209         struct lfsck_request              *lr    = &info->lti_lr;
4210         struct lfsck_instance             *lfsck = com->lc_lfsck;
4211         int                                rc;
4212
4213         spin_lock(&ltds->ltd_lock);
4214         if (list_empty(&ltd->ltd_layout_list)) {
4215                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
4216                 spin_unlock(&ltds->ltd_lock);
4217
4218                 return 0;
4219         }
4220
4221         list_del_init(&ltd->ltd_layout_phase_list);
4222         list_del_init(&ltd->ltd_layout_list);
4223         spin_unlock(&ltds->ltd_lock);
4224
4225         memset(lr, 0, sizeof(*lr));
4226         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
4227         lr->lr_event = LE_PEER_EXIT;
4228         lr->lr_active = LT_LAYOUT;
4229         lr->lr_status = LS_CO_PAUSED;
4230         if (ltds == &lfsck->li_ost_descs)
4231                 lr->lr_flags = LEF_TO_OST;
4232
4233         laia->laia_com = com;
4234         laia->laia_ltds = ltds;
4235         atomic_inc(&ltd->ltd_ref);
4236         laia->laia_ltd = ltd;
4237         laia->laia_lr = lr;
4238         laia->laia_shared = 0;
4239
4240         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
4241                                  lfsck_layout_master_async_interpret,
4242                                  laia, LFSCK_NOTIFY);
4243         if (rc != 0) {
4244                 CERROR("%s: Fail to notify %s %x for co-stop: rc = %d\n",
4245                        lfsck_lfsck2name(lfsck),
4246                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
4247                        ltd->ltd_index, rc);
4248                 lfsck_tgt_put(ltd);
4249         }
4250
4251         return rc;
4252 }
4253
4254 /* with lfsck::li_lock held */
4255 static int lfsck_layout_slave_join(const struct lu_env *env,
4256                                    struct lfsck_component *com,
4257                                    struct lfsck_start_param *lsp)
4258 {
4259         struct lfsck_instance            *lfsck = com->lc_lfsck;
4260         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4261         struct lfsck_layout_slave_target *llst;
4262         struct lfsck_start               *start = lsp->lsp_start;
4263         int                               rc    = 0;
4264         ENTRY;
4265
4266         if (!lsp->lsp_index_valid || start == NULL ||
4267             !(start->ls_flags & LPF_ALL_TGT) ||
4268             !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT))
4269                 RETURN(-EALREADY);
4270
4271         spin_unlock(&lfsck->li_lock);
4272         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4273         spin_lock(&lfsck->li_lock);
4274         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
4275                 spin_unlock(&lfsck->li_lock);
4276                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
4277                                                       true);
4278                 if (llst != NULL)
4279                         lfsck_layout_llst_put(llst);
4280                 spin_lock(&lfsck->li_lock);
4281                 rc = -EAGAIN;
4282         }
4283
4284         RETURN(rc);
4285 }
4286
4287 static struct lfsck_operations lfsck_layout_master_ops = {
4288         .lfsck_reset            = lfsck_layout_reset,
4289         .lfsck_fail             = lfsck_layout_fail,
4290         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
4291         .lfsck_prep             = lfsck_layout_master_prep,
4292         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
4293         .lfsck_exec_dir         = lfsck_layout_exec_dir,
4294         .lfsck_post             = lfsck_layout_master_post,
4295         .lfsck_interpret        = lfsck_layout_master_async_interpret,
4296         .lfsck_dump             = lfsck_layout_dump,
4297         .lfsck_double_scan      = lfsck_layout_master_double_scan,
4298         .lfsck_data_release     = lfsck_layout_master_data_release,
4299         .lfsck_quit             = lfsck_layout_master_quit,
4300         .lfsck_in_notify        = lfsck_layout_master_in_notify,
4301         .lfsck_query            = lfsck_layout_query,
4302         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
4303 };
4304
4305 static struct lfsck_operations lfsck_layout_slave_ops = {
4306         .lfsck_reset            = lfsck_layout_reset,
4307         .lfsck_fail             = lfsck_layout_fail,
4308         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
4309         .lfsck_prep             = lfsck_layout_slave_prep,
4310         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
4311         .lfsck_exec_dir         = lfsck_layout_exec_dir,
4312         .lfsck_post             = lfsck_layout_slave_post,
4313         .lfsck_dump             = lfsck_layout_dump,
4314         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
4315         .lfsck_data_release     = lfsck_layout_slave_data_release,
4316         .lfsck_quit             = lfsck_layout_slave_quit,
4317         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
4318         .lfsck_query            = lfsck_layout_query,
4319         .lfsck_join             = lfsck_layout_slave_join,
4320 };
4321
4322 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
4323 {
4324         struct lfsck_component  *com;
4325         struct lfsck_layout     *lo;
4326         struct dt_object        *root = NULL;
4327         struct dt_object        *obj;
4328         int                      rc;
4329         ENTRY;
4330
4331         OBD_ALLOC_PTR(com);
4332         if (com == NULL)
4333                 RETURN(-ENOMEM);
4334
4335         INIT_LIST_HEAD(&com->lc_link);
4336         INIT_LIST_HEAD(&com->lc_link_dir);
4337         init_rwsem(&com->lc_sem);
4338         atomic_set(&com->lc_ref, 1);
4339         com->lc_lfsck = lfsck;
4340         com->lc_type = LT_LAYOUT;
4341         if (lfsck->li_master) {
4342                 struct lfsck_layout_master_data *llmd;
4343
4344                 com->lc_ops = &lfsck_layout_master_ops;
4345                 OBD_ALLOC_PTR(llmd);
4346                 if (llmd == NULL)
4347                         GOTO(out, rc = -ENOMEM);
4348
4349                 INIT_LIST_HEAD(&llmd->llmd_req_list);
4350                 spin_lock_init(&llmd->llmd_lock);
4351                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
4352                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
4353                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
4354                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
4355                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
4356                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
4357                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
4358                 com->lc_data = llmd;
4359         } else {
4360                 struct lfsck_layout_slave_data *llsd;
4361
4362                 com->lc_ops = &lfsck_layout_slave_ops;
4363                 OBD_ALLOC_PTR(llsd);
4364                 if (llsd == NULL)
4365                         GOTO(out, rc = -ENOMEM);
4366
4367                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
4368                 INIT_LIST_HEAD(&llsd->llsd_master_list);
4369                 spin_lock_init(&llsd->llsd_lock);
4370                 llsd->llsd_rb_root = RB_ROOT;
4371                 rwlock_init(&llsd->llsd_rb_lock);
4372                 com->lc_data = llsd;
4373         }
4374         com->lc_file_size = sizeof(*lo);
4375         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
4376         if (com->lc_file_ram == NULL)
4377                 GOTO(out, rc = -ENOMEM);
4378
4379         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
4380         if (com->lc_file_disk == NULL)
4381                 GOTO(out, rc = -ENOMEM);
4382
4383         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
4384         if (IS_ERR(root))
4385                 GOTO(out, rc = PTR_ERR(root));
4386
4387         if (unlikely(!dt_try_as_dir(env, root)))
4388                 GOTO(out, rc = -ENOTDIR);
4389
4390         obj = local_file_find_or_create(env, lfsck->li_los, root,
4391                                         lfsck_layout_name,
4392                                         S_IFREG | S_IRUGO | S_IWUSR);
4393         if (IS_ERR(obj))
4394                 GOTO(out, rc = PTR_ERR(obj));
4395
4396         com->lc_obj = obj;
4397         rc = lfsck_layout_load(env, com);
4398         if (rc > 0)
4399                 rc = lfsck_layout_reset(env, com, true);
4400         else if (rc == -ENOENT)
4401                 rc = lfsck_layout_init(env, com);
4402
4403         if (rc != 0)
4404                 GOTO(out, rc);
4405
4406         lo = com->lc_file_ram;
4407         switch (lo->ll_status) {
4408         case LS_INIT:
4409         case LS_COMPLETED:
4410         case LS_FAILED:
4411         case LS_STOPPED:
4412         case LS_PARTIAL:
4413                 spin_lock(&lfsck->li_lock);
4414                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4415                 spin_unlock(&lfsck->li_lock);
4416                 break;
4417         default:
4418                 CERROR("%s: unknown lfsck_layout status: rc = %u\n",
4419                        lfsck_lfsck2name(lfsck), lo->ll_status);
4420                 /* fall through */
4421         case LS_SCANNING_PHASE1:
4422         case LS_SCANNING_PHASE2:
4423                 /* No need to store the status to disk right now.
4424                  * If the system crashed before the status stored,
4425                  * it will be loaded back when next time. */
4426                 lo->ll_status = LS_CRASHED;
4427                 lo->ll_flags |= LF_INCOMPLETE;
4428                 /* fall through */
4429         case LS_PAUSED:
4430         case LS_CRASHED:
4431         case LS_CO_FAILED:
4432         case LS_CO_STOPPED:
4433         case LS_CO_PAUSED:
4434                 spin_lock(&lfsck->li_lock);
4435                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
4436                 spin_unlock(&lfsck->li_lock);
4437                 break;
4438         }
4439
4440         if (lo->ll_flags & LF_CRASHED_LASTID) {
4441                 LASSERT(lfsck->li_out_notify != NULL);
4442
4443                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4444                                      LE_LASTID_REBUILDING);
4445         }
4446
4447         GOTO(out, rc = 0);
4448
4449 out:
4450         if (root != NULL && !IS_ERR(root))
4451                 lu_object_put(env, &root->do_lu);
4452
4453         if (rc != 0)
4454                 lfsck_component_cleanup(env, com);
4455
4456         return rc;
4457 }
4458
4459 struct lfsck_orphan_it {
4460         struct lfsck_component           *loi_com;
4461         struct lfsck_rbtree_node         *loi_lrn;
4462         struct lfsck_layout_slave_target *loi_llst;
4463         struct lu_fid                     loi_key;
4464         struct lu_orphan_rec              loi_rec;
4465         __u64                             loi_hash;
4466         unsigned int                      loi_over:1;
4467 };
4468
4469 static int lfsck_fid_match_idx(const struct lu_env *env,
4470                                struct lfsck_instance *lfsck,
4471                                const struct lu_fid *fid, int idx)
4472 {
4473         struct seq_server_site  *ss;
4474         struct lu_server_fld    *sf;
4475         struct lu_seq_range      range  = { 0 };
4476         int                      rc;
4477
4478         /* All abnormal cases will be returned to MDT0. */
4479         if (!fid_is_norm(fid)) {
4480                 if (idx == 0)
4481                         return 1;
4482
4483                 return 0;
4484         }
4485
4486         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
4487         if (unlikely(ss == NULL))
4488                 return -ENOTCONN;
4489
4490         sf = ss->ss_server_fld;
4491         LASSERT(sf != NULL);
4492
4493         fld_range_set_any(&range);
4494         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
4495         if (rc != 0)
4496                 return rc;
4497
4498         if (!fld_range_is_mdt(&range))
4499                 return -EINVAL;
4500
4501         if (range.lsr_index == idx)
4502                 return 1;
4503
4504         return 0;
4505 }
4506
4507 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
4508                                         struct dt_device *dev,
4509                                         struct dt_object *obj)
4510 {
4511         struct thandle *handle;
4512         int             rc;
4513         ENTRY;
4514
4515         handle = dt_trans_create(env, dev);
4516         if (IS_ERR(handle))
4517                 RETURN_EXIT;
4518
4519         rc = dt_declare_ref_del(env, obj, handle);
4520         if (rc != 0)
4521                 GOTO(stop, rc);
4522
4523         rc = dt_declare_destroy(env, obj, handle);
4524         if (rc != 0)
4525                 GOTO(stop, rc);
4526
4527         rc = dt_trans_start_local(env, dev, handle);
4528         if (rc != 0)
4529                 GOTO(stop, rc);
4530
4531         dt_write_lock(env, obj, 0);
4532         rc = dt_ref_del(env, obj, handle);
4533         if (rc == 0)
4534                 rc = dt_destroy(env, obj, handle);
4535         dt_write_unlock(env, obj);
4536
4537         GOTO(stop, rc);
4538
4539 stop:
4540         dt_trans_stop(env, dev, handle);
4541
4542         RETURN_EXIT;
4543 }
4544
4545 static int lfsck_orphan_index_lookup(const struct lu_env *env,
4546                                      struct dt_object *dt,
4547                                      struct dt_rec *rec,
4548                                      const struct dt_key *key,
4549                                      struct lustre_capa *capa)
4550 {
4551         return -EOPNOTSUPP;
4552 }
4553
4554 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
4555                                              struct dt_object *dt,
4556                                              const struct dt_rec *rec,
4557                                              const struct dt_key *key,
4558                                              struct thandle *handle)
4559 {
4560         return -EOPNOTSUPP;
4561 }
4562
4563 static int lfsck_orphan_index_insert(const struct lu_env *env,
4564                                      struct dt_object *dt,
4565                                      const struct dt_rec *rec,
4566                                      const struct dt_key *key,
4567                                      struct thandle *handle,
4568                                      struct lustre_capa *capa,
4569                                      int ignore_quota)
4570 {
4571         return -EOPNOTSUPP;
4572 }
4573
4574 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
4575                                              struct dt_object *dt,
4576                                              const struct dt_key *key,
4577                                              struct thandle *handle)
4578 {
4579         return -EOPNOTSUPP;
4580 }
4581
4582 static int lfsck_orphan_index_delete(const struct lu_env *env,
4583                                      struct dt_object *dt,
4584                                      const struct dt_key *key,
4585                                      struct thandle *handle,
4586                                      struct lustre_capa *capa)
4587 {
4588         return -EOPNOTSUPP;
4589 }
4590
4591 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
4592                                           struct dt_object *dt,
4593                                           __u32 attr,
4594                                           struct lustre_capa *capa)
4595 {
4596         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
4597         struct lfsck_instance           *lfsck;
4598         struct lfsck_component          *com    = NULL;
4599         struct lfsck_layout_slave_data  *llsd;
4600         struct lfsck_orphan_it          *it     = NULL;
4601         int                              rc     = 0;
4602         ENTRY;
4603
4604         lfsck = lfsck_instance_find(dev, true, false);
4605         if (unlikely(lfsck == NULL))
4606                 RETURN(ERR_PTR(-ENODEV));
4607
4608         com = lfsck_component_find(lfsck, LT_LAYOUT);
4609         if (unlikely(com == NULL))
4610                 GOTO(out, rc = -ENOENT);
4611
4612         llsd = com->lc_data;
4613         if (!llsd->llsd_rbtree_valid)
4614                 GOTO(out, rc = -ESRCH);
4615
4616         OBD_ALLOC_PTR(it);
4617         if (it == NULL)
4618                 GOTO(out, rc = -ENOMEM);
4619
4620         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
4621         if (it->loi_llst == NULL)
4622                 GOTO(out, rc = -ENODEV);
4623
4624         if (dev->dd_record_fid_accessed) {
4625                 /* The first iteratino against the rbtree, scan the whole rbtree
4626                  * to remove the nodes which do NOT need to be handled. */
4627                 write_lock(&llsd->llsd_rb_lock);
4628                 if (dev->dd_record_fid_accessed) {
4629                         struct rb_node                  *node;
4630                         struct rb_node                  *next;
4631                         struct lfsck_rbtree_node        *lrn;
4632
4633                         /* No need to record the fid accessing anymore. */
4634                         dev->dd_record_fid_accessed = 0;
4635
4636                         node = rb_first(&llsd->llsd_rb_root);
4637                         while (node != NULL) {
4638                                 next = rb_next(node);
4639                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
4640                                                lrn_node);
4641                                 if (atomic_read(&lrn->lrn_known_count) <=
4642                                     atomic_read(&lrn->lrn_accessed_count)) {
4643                                         rb_erase(node, &llsd->llsd_rb_root);
4644                                         lfsck_rbtree_free(lrn);
4645                                 }
4646                                 node = next;
4647                         }
4648                 }
4649                 write_unlock(&llsd->llsd_rb_lock);
4650         }
4651
4652         /* read lock the rbtree when init, and unlock when fini */
4653         read_lock(&llsd->llsd_rb_lock);
4654         it->loi_com = com;
4655         com = NULL;
4656
4657         GOTO(out, rc = 0);
4658
4659 out:
4660         if (com != NULL)
4661                 lfsck_component_put(env, com);
4662         lfsck_instance_put(env, lfsck);
4663         if (rc != 0) {
4664                 if (it != NULL)
4665                         OBD_FREE_PTR(it);
4666
4667                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
4668         }
4669
4670         return (struct dt_it *)it;
4671 }
4672
4673 static void lfsck_orphan_it_fini(const struct lu_env *env,
4674                                  struct dt_it *di)
4675 {
4676         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
4677         struct lfsck_component           *com   = it->loi_com;
4678         struct lfsck_layout_slave_data   *llsd;
4679         struct lfsck_layout_slave_target *llst;
4680
4681         if (com != NULL) {
4682                 llsd = com->lc_data;
4683                 read_unlock(&llsd->llsd_rb_lock);
4684                 llst = it->loi_llst;
4685                 LASSERT(llst != NULL);
4686
4687                 /* Save the key and hash for iterate next. */
4688                 llst->llst_fid = it->loi_key;
4689                 llst->llst_hash = it->loi_hash;
4690                 lfsck_layout_llst_put(llst);
4691                 lfsck_component_put(env, com);
4692         }
4693         OBD_FREE_PTR(it);
4694 }
4695
4696 /**
4697  * \retval       +1: the iteration finished
4698  * \retval        0: on success, not finished
4699  * \retval      -ve: on error
4700  */
4701 static int lfsck_orphan_it_next(const struct lu_env *env,
4702                                 struct dt_it *di)
4703 {
4704         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4705         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
4706         struct lu_attr                  *la     = &info->lti_la;
4707         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
4708         struct lu_fid                   *key    = &it->loi_key;
4709         struct lu_orphan_rec            *rec    = &it->loi_rec;
4710         struct lfsck_component          *com    = it->loi_com;
4711         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4712         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4713         struct dt_object                *obj;
4714         struct lfsck_rbtree_node        *lrn;
4715         int                              pos;
4716         int                              rc;
4717         __u32                            save;
4718         __u32                            idx    = it->loi_llst->llst_index;
4719         bool                             exact  = false;
4720         ENTRY;
4721
4722         if (it->loi_over)
4723                 RETURN(1);
4724
4725 again0:
4726         lrn = it->loi_lrn;
4727         if (lrn == NULL) {
4728                 lrn = lfsck_rbtree_search(llsd, key, &exact);
4729                 if (lrn == NULL) {
4730                         it->loi_over = 1;
4731                         RETURN(1);
4732                 }
4733
4734                 it->loi_lrn = lrn;
4735                 if (!exact) {
4736                         key->f_seq = lrn->lrn_seq;
4737                         key->f_oid = lrn->lrn_first_oid;
4738                         key->f_ver = 0;
4739                 }
4740         } else {
4741                 key->f_oid++;
4742                 if (unlikely(key->f_oid == 0)) {
4743                         key->f_seq++;
4744                         it->loi_lrn = NULL;
4745                         goto again0;
4746                 }
4747
4748                 if (key->f_oid >=
4749                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
4750                         it->loi_lrn = NULL;
4751                         goto again0;
4752                 }
4753         }
4754
4755         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
4756                      atomic_read(&lrn->lrn_accessed_count))) {
4757                 struct rb_node *next = rb_next(&lrn->lrn_node);
4758
4759                 while (next != NULL) {
4760                         lrn = rb_entry(next, struct lfsck_rbtree_node,
4761                                        lrn_node);
4762                         if (atomic_read(&lrn->lrn_known_count) >
4763                             atomic_read(&lrn->lrn_accessed_count))
4764                                 break;
4765                         next = rb_next(next);
4766                 }
4767
4768                 if (next == NULL) {
4769                         it->loi_over = 1;
4770                         RETURN(1);
4771                 }
4772
4773                 it->loi_lrn = lrn;
4774                 key->f_seq = lrn->lrn_seq;
4775                 key->f_oid = lrn->lrn_first_oid;
4776                 key->f_ver = 0;
4777         }
4778
4779         pos = key->f_oid - lrn->lrn_first_oid;
4780
4781 again1:
4782         pos = find_next_bit(lrn->lrn_known_bitmap,
4783                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
4784         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
4785                 key->f_oid = lrn->lrn_first_oid + pos;
4786                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
4787                         key->f_seq++;
4788                         key->f_oid = 0;
4789                 }
4790                 it->loi_lrn = NULL;
4791                 goto again0;
4792         }
4793
4794         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
4795                 pos++;
4796                 goto again1;
4797         }
4798
4799         key->f_oid = lrn->lrn_first_oid + pos;
4800         obj = lfsck_object_find(env, lfsck, key);
4801         if (IS_ERR(obj)) {
4802                 rc = PTR_ERR(obj);
4803                 if (rc == -ENOENT) {
4804                         pos++;
4805                         goto again1;
4806                 }
4807                 RETURN(rc);
4808         }
4809
4810         dt_read_lock(env, obj, 0);
4811         if (!dt_object_exists(obj)) {
4812                 dt_read_unlock(env, obj);
4813                 lfsck_object_put(env, obj);
4814                 pos++;
4815                 goto again1;
4816         }
4817
4818         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
4819         if (rc != 0)
4820                 GOTO(out, rc);
4821
4822         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
4823                           XATTR_NAME_FID, BYPASS_CAPA);
4824         if (rc == -ENODATA) {
4825                 /* For the pre-created OST-object, update the bitmap to avoid
4826                  * others LFSCK (second phase) iteration to touch it again. */
4827                 if (la->la_ctime == 0) {
4828                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
4829                                 atomic_inc(&lrn->lrn_accessed_count);
4830
4831                         /* For the race between repairing dangling referenced
4832                          * MDT-object and unlink the file, it may left orphan
4833                          * OST-object there. Destroy it now! */
4834                         if (unlikely(!(la->la_mode & S_ISUID))) {
4835                                 dt_read_unlock(env, obj);
4836                                 lfsck_layout_destroy_orphan(env,
4837                                                             lfsck->li_bottom,
4838                                                             obj);
4839                                 lfsck_object_put(env, obj);
4840                                 pos++;
4841                                 goto again1;
4842                         }
4843                 } else if (idx == 0) {
4844                         /* If the orphan OST-object has no parent information,
4845                          * regard it as referenced by the MDT-object on MDT0. */
4846                         fid_zero(&rec->lor_fid);
4847                         rec->lor_uid = la->la_uid;
4848                         rec->lor_gid = la->la_gid;
4849                         GOTO(out, rc = 0);
4850                 }
4851
4852                 dt_read_unlock(env, obj);
4853                 lfsck_object_put(env, obj);
4854                 pos++;
4855                 goto again1;
4856         }
4857
4858         if (rc < 0)
4859                 GOTO(out, rc);
4860
4861         if (rc != sizeof(struct filter_fid) &&
4862             rc != sizeof(struct filter_fid_old))
4863                 GOTO(out, rc = -EINVAL);
4864
4865         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
4866         /* In fact, the ff_parent::f_ver is not the real parent FID::f_ver,
4867          * instead, it is the OST-object index in its parent MDT-object
4868          * layout EA. */
4869         save = rec->lor_fid.f_ver;
4870         rec->lor_fid.f_ver = 0;
4871         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
4872         /* If the orphan OST-object does not claim the MDT, then next.
4873          *
4874          * If we do not know whether it matches or not, then return it
4875          * to the MDT for further check. */
4876         if (rc == 0) {
4877                 dt_read_unlock(env, obj);
4878                 lfsck_object_put(env, obj);
4879                 pos++;
4880                 goto again1;
4881         }
4882
4883         rec->lor_fid.f_ver = save;
4884         rec->lor_uid = la->la_uid;
4885         rec->lor_gid = la->la_gid;
4886
4887         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
4888                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
4889                rec->lor_uid, rec->lor_gid);
4890
4891         GOTO(out, rc = 0);
4892
4893 out:
4894         dt_read_unlock(env, obj);
4895         lfsck_object_put(env, obj);
4896         if (rc == 0)
4897                 it->loi_hash++;
4898
4899         return rc;
4900 }
4901
4902 /**
4903  * \retval       +1: locate to the exactly position
4904  * \retval        0: cannot locate to the exactly position,
4905  *                   call next() to move to a valid position.
4906  * \retval      -ve: on error
4907  */
4908 static int lfsck_orphan_it_get(const struct lu_env *env,
4909                                struct dt_it *di,
4910                                const struct dt_key *key)
4911 {
4912         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
4913         int                      rc;
4914
4915         it->loi_key = *(struct lu_fid *)key;
4916         rc = lfsck_orphan_it_next(env, di);
4917         if (rc == 1)
4918                 return 0;
4919
4920         if (rc == 0)
4921                 return 1;
4922
4923         return rc;
4924 }
4925
4926 static void lfsck_orphan_it_put(const struct lu_env *env,
4927                                 struct dt_it *di)
4928 {
4929 }
4930
4931 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
4932                                           const struct dt_it *di)
4933 {
4934         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
4935
4936         return (struct dt_key *)&it->loi_key;
4937 }
4938
4939 static int lfsck_orphan_it_key_size(const struct lu_env *env,
4940                                     const struct dt_it *di)
4941 {
4942         return sizeof(struct lu_fid);
4943 }
4944
4945 static int lfsck_orphan_it_rec(const struct lu_env *env,
4946                                const struct dt_it *di,
4947                                struct dt_rec *rec,
4948                                __u32 attr)
4949 {
4950         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
4951
4952         *(struct lu_orphan_rec *)rec = it->loi_rec;
4953
4954         return 0;
4955 }
4956
4957 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
4958                                    const struct dt_it *di)
4959 {
4960         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
4961
4962         return it->loi_hash;
4963 }
4964
4965 /**
4966  * \retval       +1: locate to the exactly position
4967  * \retval        0: cannot locate to the exactly position,
4968  *                   call next() to move to a valid position.
4969  * \retval      -ve: on error
4970  */
4971 static int lfsck_orphan_it_load(const struct lu_env *env,
4972                                 const struct dt_it *di,
4973                                 __u64 hash)
4974 {
4975         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
4976         struct lfsck_layout_slave_target *llst = it->loi_llst;
4977         int                               rc;
4978
4979         LASSERT(llst != NULL);
4980
4981         if (hash != llst->llst_hash) {
4982                 CWARN("%s: the given hash "LPU64" for orphan iteration does "
4983                       "not match the one when fini "LPU64", to be reset.\n",
4984                       lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
4985                       llst->llst_hash);
4986                 fid_zero(&llst->llst_fid);
4987                 llst->llst_hash = 0;
4988         }
4989
4990         it->loi_key = llst->llst_fid;
4991         it->loi_hash = llst->llst_hash;
4992         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
4993         if (rc == 1)
4994                 return 0;
4995
4996         if (rc == 0)
4997                 return 1;
4998
4999         return rc;
5000 }
5001
5002 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
5003                                    const struct dt_it *di,
5004                                    void *key_rec)
5005 {
5006         return 0;
5007 }
5008
5009 const struct dt_index_operations lfsck_orphan_index_ops = {
5010         .dio_lookup             = lfsck_orphan_index_lookup,
5011         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
5012         .dio_insert             = lfsck_orphan_index_insert,
5013         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
5014         .dio_delete             = lfsck_orphan_index_delete,
5015         .dio_it = {
5016                 .init           = lfsck_orphan_it_init,
5017                 .fini           = lfsck_orphan_it_fini,
5018                 .get            = lfsck_orphan_it_get,
5019                 .put            = lfsck_orphan_it_put,
5020                 .next           = lfsck_orphan_it_next,
5021                 .key            = lfsck_orphan_it_key,
5022                 .key_size       = lfsck_orphan_it_key_size,
5023                 .rec            = lfsck_orphan_it_rec,
5024                 .store          = lfsck_orphan_it_store,
5025                 .load           = lfsck_orphan_it_load,
5026                 .key_rec        = lfsck_orphan_it_key_rec,
5027         }
5028 };