Whamcloud - gitweb
LU-4788 lfsck: take ldlm lock before modifying visible object
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         /* The position for next record in the rbtree for iteration. */
68         struct lu_fid           llst_fid;
69         /* Dummy hash for iteration against the rbtree. */
70         __u64                   llst_hash;
71         __u64                   llst_gen;
72         atomic_t                llst_ref;
73         __u32                   llst_index;
74 };
75
76 struct lfsck_layout_slave_data {
77         /* list for lfsck_layout_seq */
78         struct list_head         llsd_seq_list;
79
80         /* list for the masters involve layout verification. */
81         struct list_head         llsd_master_list;
82         spinlock_t               llsd_lock;
83         __u64                    llsd_touch_gen;
84         struct dt_object        *llsd_rb_obj;
85         struct rb_root           llsd_rb_root;
86         rwlock_t                 llsd_rb_lock;
87         unsigned int             llsd_rbtree_valid:1;
88 };
89
90 struct lfsck_layout_object {
91         struct dt_object        *llo_obj;
92         struct lu_attr           llo_attr;
93         atomic_t                 llo_ref;
94         __u16                    llo_gen;
95 };
96
97 struct lfsck_layout_req {
98         struct list_head                 llr_list;
99         struct lfsck_layout_object      *llr_parent;
100         struct dt_object                *llr_child;
101         __u32                            llr_ost_idx;
102         __u32                            llr_lov_idx; /* offset in LOV EA */
103 };
104
105 struct lfsck_layout_master_data {
106         spinlock_t              llmd_lock;
107         struct list_head        llmd_req_list;
108
109         /* list for the ost targets involve layout verification. */
110         struct list_head        llmd_ost_list;
111
112         /* list for the ost targets in phase1 scanning. */
113         struct list_head        llmd_ost_phase1_list;
114
115         /* list for the ost targets in phase1 scanning. */
116         struct list_head        llmd_ost_phase2_list;
117
118         /* list for the mdt targets involve layout verification. */
119         struct list_head        llmd_mdt_list;
120
121         /* list for the mdt targets in phase1 scanning. */
122         struct list_head        llmd_mdt_phase1_list;
123
124         /* list for the mdt targets in phase1 scanning. */
125         struct list_head        llmd_mdt_phase2_list;
126
127         struct ptlrpc_thread    llmd_thread;
128         __u32                   llmd_touch_gen;
129         int                     llmd_prefetched;
130         int                     llmd_assistant_status;
131         int                     llmd_post_result;
132         unsigned int            llmd_to_post:1,
133                                 llmd_to_double_scan:1,
134                                 llmd_in_double_scan:1,
135                                 llmd_exit:1;
136 };
137
138 struct lfsck_layout_slave_async_args {
139         struct obd_export                *llsaa_exp;
140         struct lfsck_component           *llsaa_com;
141         struct lfsck_layout_slave_target *llsaa_llst;
142 };
143
144 static struct lfsck_layout_object *
145 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
146                          __u16 gen)
147 {
148         struct lfsck_layout_object *llo;
149         int                         rc;
150
151         OBD_ALLOC_PTR(llo);
152         if (llo == NULL)
153                 return ERR_PTR(-ENOMEM);
154
155         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
156         if (rc != 0) {
157                 OBD_FREE_PTR(llo);
158
159                 return ERR_PTR(rc);
160         }
161
162         lu_object_get(&obj->do_lu);
163         llo->llo_obj = obj;
164         /* The gen can be used to check whether some others have changed the
165          * file layout after LFSCK pre-fetching but before real verification. */
166         llo->llo_gen = gen;
167         atomic_set(&llo->llo_ref, 1);
168
169         return llo;
170 }
171
172 static inline void
173 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
174 {
175         if (atomic_dec_and_test(&llst->llst_ref)) {
176                 LASSERT(list_empty(&llst->llst_list));
177
178                 OBD_FREE_PTR(llst);
179         }
180 }
181
182 static inline int
183 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
184 {
185         struct lfsck_layout_slave_target *llst;
186         struct lfsck_layout_slave_target *tmp;
187         int                               rc   = 0;
188
189         OBD_ALLOC_PTR(llst);
190         if (llst == NULL)
191                 return -ENOMEM;
192
193         INIT_LIST_HEAD(&llst->llst_list);
194         llst->llst_gen = 0;
195         llst->llst_index = index;
196         atomic_set(&llst->llst_ref, 1);
197
198         spin_lock(&llsd->llsd_lock);
199         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
200                 if (tmp->llst_index == index) {
201                         rc = -EALREADY;
202                         break;
203                 }
204         }
205         if (rc == 0)
206                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
207         spin_unlock(&llsd->llsd_lock);
208
209         if (rc != 0)
210                 OBD_FREE_PTR(llst);
211
212         return rc;
213 }
214
215 static inline void
216 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
217                       struct lfsck_layout_slave_target *llst)
218 {
219         bool del = false;
220
221         spin_lock(&llsd->llsd_lock);
222         if (!list_empty(&llst->llst_list)) {
223                 list_del_init(&llst->llst_list);
224                 del = true;
225         }
226         spin_unlock(&llsd->llsd_lock);
227
228         if (del)
229                 lfsck_layout_llst_put(llst);
230 }
231
232 static inline struct lfsck_layout_slave_target *
233 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
234                                __u32 index, bool unlink)
235 {
236         struct lfsck_layout_slave_target *llst;
237
238         spin_lock(&llsd->llsd_lock);
239         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
240                 if (llst->llst_index == index) {
241                         if (unlink)
242                                 list_del_init(&llst->llst_list);
243                         else
244                                 atomic_inc(&llst->llst_ref);
245                         spin_unlock(&llsd->llsd_lock);
246
247                         return llst;
248                 }
249         }
250         spin_unlock(&llsd->llsd_lock);
251
252         return NULL;
253 }
254
255 static inline void lfsck_layout_object_put(const struct lu_env *env,
256                                            struct lfsck_layout_object *llo)
257 {
258         if (atomic_dec_and_test(&llo->llo_ref)) {
259                 lfsck_object_put(env, llo->llo_obj);
260                 OBD_FREE_PTR(llo);
261         }
262 }
263
264 static struct lfsck_layout_req *
265 lfsck_layout_req_init(struct lfsck_layout_object *parent,
266                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
267 {
268         struct lfsck_layout_req *llr;
269
270         OBD_ALLOC_PTR(llr);
271         if (llr == NULL)
272                 return ERR_PTR(-ENOMEM);
273
274         INIT_LIST_HEAD(&llr->llr_list);
275         atomic_inc(&parent->llo_ref);
276         llr->llr_parent = parent;
277         llr->llr_child = child;
278         llr->llr_ost_idx = ost_idx;
279         llr->llr_lov_idx = lov_idx;
280
281         return llr;
282 }
283
284 static inline void lfsck_layout_req_fini(const struct lu_env *env,
285                                          struct lfsck_layout_req *llr)
286 {
287         lu_object_put(env, &llr->llr_child->do_lu);
288         lfsck_layout_object_put(env, llr->llr_parent);
289         OBD_FREE_PTR(llr);
290 }
291
292 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
293 {
294         bool empty = false;
295
296         spin_lock(&llmd->llmd_lock);
297         if (list_empty(&llmd->llmd_req_list))
298                 empty = true;
299         spin_unlock(&llmd->llmd_lock);
300
301         return empty;
302 }
303
304 static int lfsck_layout_get_lovea(const struct lu_env *env,
305                                   struct dt_object *obj, struct lu_buf *buf)
306 {
307         int rc;
308
309 again:
310         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
311         if (rc == -ERANGE) {
312                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
313                                   BYPASS_CAPA);
314                 if (rc <= 0)
315                         return rc;
316
317                 lu_buf_realloc(buf, rc);
318                 if (buf->lb_buf == NULL)
319                         return -ENOMEM;
320
321                 goto again;
322         }
323
324         if (rc == -ENODATA)
325                 rc = 0;
326
327         if (rc <= 0)
328                 return rc;
329
330         if (unlikely(buf->lb_buf == NULL)) {
331                 lu_buf_alloc(buf, rc);
332                 if (buf->lb_buf == NULL)
333                         return -ENOMEM;
334
335                 goto again;
336         }
337
338         return rc;
339 }
340
341 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
342 {
343         __u32 magic;
344         __u32 pattern;
345
346         magic = le32_to_cpu(lmm->lmm_magic);
347         /* If magic crashed, keep it there. Sometime later, during OST-object
348          * orphan handling, if some OST-object(s) back-point to it, it can be
349          * verified and repaired. */
350         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
351                 struct ost_id   oi;
352                 int             rc;
353
354                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
355                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
356                         rc = -EOPNOTSUPP;
357                 else
358                         rc = -EINVAL;
359
360                 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
361                        rc == -EINVAL ? "Unknown" : "Unsupported",
362                        magic, POSTID(&oi));
363
364                 return rc;
365         }
366
367         pattern = le32_to_cpu(lmm->lmm_pattern);
368         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
369         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
370                 struct ost_id oi;
371
372                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
373                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
374                        pattern, POSTID(&oi));
375
376                 return -EOPNOTSUPP;
377         }
378
379         return 0;
380 }
381
382 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
383 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
384 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
385
386 struct lfsck_rbtree_node {
387         struct rb_node   lrn_node;
388         __u64            lrn_seq;
389         __u32            lrn_first_oid;
390         atomic_t         lrn_known_count;
391         atomic_t         lrn_accessed_count;
392         void            *lrn_known_bitmap;
393         void            *lrn_accessed_bitmap;
394 };
395
396 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
397                                    __u64 seq, __u32 oid)
398 {
399         if (seq < lrn->lrn_seq)
400                 return -1;
401
402         if (seq > lrn->lrn_seq)
403                 return 1;
404
405         if (oid < lrn->lrn_first_oid)
406                 return -1;
407
408         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
409                 return 1;
410
411         return 0;
412 }
413
414 /* The caller should hold llsd->llsd_rb_lock. */
415 static struct lfsck_rbtree_node *
416 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
417                     const struct lu_fid *fid, bool *exact)
418 {
419         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
420         struct rb_node           *prev  = NULL;
421         struct lfsck_rbtree_node *lrn   = NULL;
422         int                       rc    = 0;
423
424         if (exact != NULL)
425                 *exact = true;
426
427         while (node != NULL) {
428                 prev = node;
429                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
430                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
431                 if (rc < 0)
432                         node = node->rb_left;
433                 else if (rc > 0)
434                         node = node->rb_right;
435                 else
436                         return lrn;
437         }
438
439         if (exact == NULL)
440                 return NULL;
441
442         /* If there is no exactly matched one, then to the next valid one. */
443         *exact = false;
444
445         /* The rbtree is empty. */
446         if (rc == 0)
447                 return NULL;
448
449         if (rc < 0)
450                 return lrn;
451
452         node = rb_next(prev);
453
454         /* The end of the rbtree. */
455         if (node == NULL)
456                 return NULL;
457
458         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
459
460         return lrn;
461 }
462
463 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
464                                                   const struct lu_fid *fid)
465 {
466         struct lfsck_rbtree_node *lrn;
467
468         OBD_ALLOC_PTR(lrn);
469         if (lrn == NULL)
470                 return ERR_PTR(-ENOMEM);
471
472         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
473         if (lrn->lrn_known_bitmap == NULL) {
474                 OBD_FREE_PTR(lrn);
475
476                 return ERR_PTR(-ENOMEM);
477         }
478
479         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
480         if (lrn->lrn_accessed_bitmap == NULL) {
481                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
482                 OBD_FREE_PTR(lrn);
483
484                 return ERR_PTR(-ENOMEM);
485         }
486
487         RB_CLEAR_NODE(&lrn->lrn_node);
488         lrn->lrn_seq = fid_seq(fid);
489         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
490         atomic_set(&lrn->lrn_known_count, 0);
491         atomic_set(&lrn->lrn_accessed_count, 0);
492
493         return lrn;
494 }
495
496 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
497 {
498         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
499         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
500         OBD_FREE_PTR(lrn);
501 }
502
503 /* The caller should hold lock. */
504 static struct lfsck_rbtree_node *
505 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
506                     struct lfsck_rbtree_node *lrn)
507 {
508         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
509         struct rb_node            *parent = NULL;
510         struct lfsck_rbtree_node  *tmp;
511         int                        rc;
512
513         while (*pos != NULL) {
514                 parent = *pos;
515                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
516                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
517                 if (rc < 0)
518                         pos = &(*pos)->rb_left;
519                 else if (rc > 0)
520                         pos = &(*pos)->rb_right;
521                 else
522                         return tmp;
523         }
524
525         rb_link_node(&lrn->lrn_node, parent, pos);
526         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
527
528         return lrn;
529 }
530
531 extern const struct dt_index_operations lfsck_orphan_index_ops;
532
533 static int lfsck_rbtree_setup(const struct lu_env *env,
534                               struct lfsck_component *com)
535 {
536         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
537         struct lfsck_instance           *lfsck  = com->lc_lfsck;
538         struct dt_device                *dev    = lfsck->li_bottom;
539         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
540         struct dt_object                *obj;
541
542         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
543         fid->f_oid = lfsck_dev_idx(dev);
544         fid->f_ver = 0;
545         obj = dt_locate(env, dev, fid);
546         if (IS_ERR(obj))
547                 RETURN(PTR_ERR(obj));
548
549         /* Generate an in-RAM object to stand for the layout rbtree.
550          * Scanning the layout rbtree will be via the iteration over
551          * the object. In the future, the rbtree may be written onto
552          * disk with the object.
553          *
554          * Mark the object to be as exist. */
555         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
556         obj->do_index_ops = &lfsck_orphan_index_ops;
557         llsd->llsd_rb_obj = obj;
558         llsd->llsd_rbtree_valid = 1;
559         dev->dd_record_fid_accessed = 1;
560
561         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
562                lfsck_lfsck2name(lfsck));
563
564         return 0;
565 }
566
567 static void lfsck_rbtree_cleanup(const struct lu_env *env,
568                                  struct lfsck_component *com)
569 {
570         struct lfsck_instance           *lfsck = com->lc_lfsck;
571         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
572         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
573         struct rb_node                  *next;
574         struct lfsck_rbtree_node        *lrn;
575
576         lfsck->li_bottom->dd_record_fid_accessed = 0;
577         /* Invalid the rbtree, then no others will use it. */
578         write_lock(&llsd->llsd_rb_lock);
579         llsd->llsd_rbtree_valid = 0;
580         write_unlock(&llsd->llsd_rb_lock);
581
582         while (node != NULL) {
583                 next = rb_next(node);
584                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
585                 rb_erase(node, &llsd->llsd_rb_root);
586                 lfsck_rbtree_free(lrn);
587                 node = next;
588         }
589
590         if (llsd->llsd_rb_obj != NULL) {
591                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
592                 llsd->llsd_rb_obj = NULL;
593         }
594
595         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
596                lfsck_lfsck2name(lfsck));
597 }
598
599 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
600                                        struct lfsck_component *com,
601                                        const struct lu_fid *fid,
602                                        bool accessed)
603 {
604         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
605         struct lfsck_rbtree_node        *lrn;
606         bool                             insert = false;
607         int                              idx;
608         int                              rc     = 0;
609         ENTRY;
610
611         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
612                 RETURN_EXIT;
613
614         if (!fid_is_idif(fid) && !fid_is_norm(fid))
615                 RETURN_EXIT;
616
617         read_lock(&llsd->llsd_rb_lock);
618         if (!llsd->llsd_rbtree_valid)
619                 GOTO(unlock, rc = 0);
620
621         lrn = lfsck_rbtree_search(llsd, fid, NULL);
622         if (lrn == NULL) {
623                 struct lfsck_rbtree_node *tmp;
624
625                 LASSERT(!insert);
626
627                 read_unlock(&llsd->llsd_rb_lock);
628                 tmp = lfsck_rbtree_new(env, fid);
629                 if (IS_ERR(tmp))
630                         GOTO(out, rc = PTR_ERR(tmp));
631
632                 insert = true;
633                 write_lock(&llsd->llsd_rb_lock);
634                 if (!llsd->llsd_rbtree_valid) {
635                         lfsck_rbtree_free(tmp);
636                         GOTO(unlock, rc = 0);
637                 }
638
639                 lrn = lfsck_rbtree_insert(llsd, tmp);
640                 if (lrn != tmp)
641                         lfsck_rbtree_free(tmp);
642         }
643
644         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
645         /* Any accessed object must be a known object. */
646         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
647                 atomic_inc(&lrn->lrn_known_count);
648         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
649                 atomic_inc(&lrn->lrn_accessed_count);
650
651         GOTO(unlock, rc = 0);
652
653 unlock:
654         if (insert)
655                 write_unlock(&llsd->llsd_rb_lock);
656         else
657                 read_unlock(&llsd->llsd_rb_lock);
658 out:
659         if (rc != 0 && accessed) {
660                 struct lfsck_layout *lo = com->lc_file_ram;
661
662                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
663                        "bitmap, and will cause incorrect LFSCK OST-object "
664                        "handling, so disable it to cancel orphan handling "
665                        "for related device. rc = %d\n",
666                        lfsck_lfsck2name(com->lc_lfsck), rc);
667
668                 lo->ll_flags |= LF_INCOMPLETE;
669                 lfsck_rbtree_cleanup(env, com);
670         }
671 }
672
673 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
674                                    const struct lfsck_layout *src)
675 {
676         int i;
677
678         des->ll_magic = le32_to_cpu(src->ll_magic);
679         des->ll_status = le32_to_cpu(src->ll_status);
680         des->ll_flags = le32_to_cpu(src->ll_flags);
681         des->ll_success_count = le32_to_cpu(src->ll_success_count);
682         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
683         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
684         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
685         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
686         des->ll_time_last_checkpoint =
687                                 le64_to_cpu(src->ll_time_last_checkpoint);
688         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
689         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
690         des->ll_pos_first_inconsistent =
691                         le64_to_cpu(src->ll_pos_first_inconsistent);
692         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
693         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
694         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
695         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
696         for (i = 0; i < LLIT_MAX; i++)
697                 des->ll_objs_repaired[i] =
698                                 le64_to_cpu(src->ll_objs_repaired[i]);
699         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
700 }
701
702 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
703                                    const struct lfsck_layout *src)
704 {
705         int i;
706
707         des->ll_magic = cpu_to_le32(src->ll_magic);
708         des->ll_status = cpu_to_le32(src->ll_status);
709         des->ll_flags = cpu_to_le32(src->ll_flags);
710         des->ll_success_count = cpu_to_le32(src->ll_success_count);
711         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
712         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
713         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
714         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
715         des->ll_time_last_checkpoint =
716                                 cpu_to_le64(src->ll_time_last_checkpoint);
717         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
718         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
719         des->ll_pos_first_inconsistent =
720                         cpu_to_le64(src->ll_pos_first_inconsistent);
721         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
722         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
723         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
724         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
725         for (i = 0; i < LLIT_MAX; i++)
726                 des->ll_objs_repaired[i] =
727                                 cpu_to_le64(src->ll_objs_repaired[i]);
728         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
729 }
730
731 /**
732  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
733  * \retval 0: succeed.
734  * \retval -ve: failed cases.
735  */
736 static int lfsck_layout_load(const struct lu_env *env,
737                              struct lfsck_component *com)
738 {
739         struct lfsck_layout             *lo     = com->lc_file_ram;
740         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
741         ssize_t                          size   = com->lc_file_size;
742         loff_t                           pos    = 0;
743         int                              rc;
744
745         rc = dbo->dbo_read(env, com->lc_obj,
746                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
747                            BYPASS_CAPA);
748         if (rc == 0) {
749                 return -ENOENT;
750         } else if (rc < 0) {
751                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
752                        lfsck_lfsck2name(com->lc_lfsck), rc);
753                 return rc;
754         } else if (rc != size) {
755                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
756                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
757                 return 1;
758         }
759
760         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
761         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
762                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
763                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
764                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
765                 return 1;
766         }
767
768         return 0;
769 }
770
771 static int lfsck_layout_store(const struct lu_env *env,
772                               struct lfsck_component *com)
773 {
774         struct dt_object         *obj           = com->lc_obj;
775         struct lfsck_instance    *lfsck         = com->lc_lfsck;
776         struct lfsck_layout      *lo            = com->lc_file_disk;
777         struct thandle           *handle;
778         ssize_t                   size          = com->lc_file_size;
779         loff_t                    pos           = 0;
780         int                       rc;
781         ENTRY;
782
783         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
784         handle = dt_trans_create(env, lfsck->li_bottom);
785         if (IS_ERR(handle))
786                 GOTO(log, rc = PTR_ERR(handle));
787
788         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
789                                      pos, handle);
790         if (rc != 0)
791                 GOTO(out, rc);
792
793         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
794         if (rc != 0)
795                 GOTO(out, rc);
796
797         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
798                              handle);
799
800         GOTO(out, rc);
801
802 out:
803         dt_trans_stop(env, lfsck->li_bottom, handle);
804
805 log:
806         if (rc != 0)
807                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
808                        lfsck_lfsck2name(lfsck), rc);
809         return rc;
810 }
811
812 static int lfsck_layout_init(const struct lu_env *env,
813                              struct lfsck_component *com)
814 {
815         struct lfsck_layout *lo = com->lc_file_ram;
816         int rc;
817
818         memset(lo, 0, com->lc_file_size);
819         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
820         lo->ll_status = LS_INIT;
821         down_write(&com->lc_sem);
822         rc = lfsck_layout_store(env, com);
823         up_write(&com->lc_sem);
824
825         return rc;
826 }
827
828 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
829                              struct dt_object *obj, const struct lu_fid *fid)
830 {
831         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
832         struct lu_seq_range      range  = { 0 };
833         struct lustre_mdt_attrs *lma;
834         int                      rc;
835
836         fld_range_set_any(&range);
837         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
838         if (rc == 0) {
839                 if (fld_range_is_ost(&range))
840                         return 1;
841
842                 return 0;
843         }
844
845         lma = &lfsck_env_info(env)->lti_lma;
846         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
847                           XATTR_NAME_LMA, BYPASS_CAPA);
848         if (rc == sizeof(*lma)) {
849                 lustre_lma_swab(lma);
850
851                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
852         }
853
854         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
855
856         return rc > 0;
857 }
858
859 static struct lfsck_layout_seq *
860 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
861 {
862         struct lfsck_layout_seq *lls;
863
864         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
865                 if (lls->lls_seq == seq)
866                         return lls;
867
868                 if (lls->lls_seq > seq)
869                         return NULL;
870         }
871
872         return NULL;
873 }
874
875 static void
876 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
877                         struct lfsck_layout_seq *lls)
878 {
879         struct lfsck_layout_seq *tmp;
880         struct list_head        *pos = &llsd->llsd_seq_list;
881
882         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
883                 if (lls->lls_seq < tmp->lls_seq) {
884                         pos = &tmp->lls_list;
885                         break;
886                 }
887         }
888         list_add_tail(&lls->lls_list, pos);
889 }
890
891 static int
892 lfsck_layout_lastid_create(const struct lu_env *env,
893                            struct lfsck_instance *lfsck,
894                            struct dt_object *obj)
895 {
896         struct lfsck_thread_info *info   = lfsck_env_info(env);
897         struct lu_attr           *la     = &info->lti_la;
898         struct dt_object_format  *dof    = &info->lti_dof;
899         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
900         struct dt_device         *dt     = lfsck->li_bottom;
901         struct thandle           *th;
902         __u64                     lastid = 0;
903         loff_t                    pos    = 0;
904         int                       rc;
905         ENTRY;
906
907         if (bk->lb_param & LPF_DRYRUN)
908                 return 0;
909
910         memset(la, 0, sizeof(*la));
911         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
912         la->la_valid = LA_MODE | LA_UID | LA_GID;
913         dof->dof_type = dt_mode_to_dft(S_IFREG);
914
915         th = dt_trans_create(env, dt);
916         if (IS_ERR(th))
917                 GOTO(log, rc = PTR_ERR(th));
918
919         rc = dt_declare_create(env, obj, la, NULL, dof, th);
920         if (rc != 0)
921                 GOTO(stop, rc);
922
923         rc = dt_declare_record_write(env, obj,
924                                      lfsck_buf_get(env, &lastid,
925                                                    sizeof(lastid)),
926                                      pos, th);
927         if (rc != 0)
928                 GOTO(stop, rc);
929
930         rc = dt_trans_start_local(env, dt, th);
931         if (rc != 0)
932                 GOTO(stop, rc);
933
934         dt_write_lock(env, obj, 0);
935         if (likely(dt_object_exists(obj) == 0)) {
936                 rc = dt_create(env, obj, la, NULL, dof, th);
937                 if (rc == 0)
938                         rc = dt_record_write(env, obj,
939                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
940                                 &pos, th);
941         }
942         dt_write_unlock(env, obj);
943
944         GOTO(stop, rc);
945
946 stop:
947         dt_trans_stop(env, dt, th);
948
949 log:
950         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
951                LPX64": rc = %d\n",
952                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
953
954         return rc;
955 }
956
957 static int
958 lfsck_layout_lastid_reload(const struct lu_env *env,
959                            struct lfsck_component *com,
960                            struct lfsck_layout_seq *lls)
961 {
962         __u64   lastid;
963         loff_t  pos     = 0;
964         int     rc;
965
966         dt_read_lock(env, lls->lls_lastid_obj, 0);
967         rc = dt_record_read(env, lls->lls_lastid_obj,
968                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
969         dt_read_unlock(env, lls->lls_lastid_obj);
970         if (unlikely(rc != 0))
971                 return rc;
972
973         lastid = le64_to_cpu(lastid);
974         if (lastid < lls->lls_lastid_known) {
975                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
976                 struct lfsck_layout     *lo     = com->lc_file_ram;
977
978                 lls->lls_lastid = lls->lls_lastid_known;
979                 lls->lls_dirty = 1;
980                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
981                         LASSERT(lfsck->li_out_notify != NULL);
982
983                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
984                                              LE_LASTID_REBUILDING);
985                         lo->ll_flags |= LF_CRASHED_LASTID;
986
987                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
988                                "LAST_ID file (1) for the sequence "LPX64
989                                ", old value "LPU64", known value "LPU64"\n",
990                                lfsck_lfsck2name(lfsck), lls->lls_seq,
991                                lastid, lls->lls_lastid);
992                 }
993         } else if (lastid >= lls->lls_lastid) {
994                 lls->lls_lastid = lastid;
995                 lls->lls_dirty = 0;
996         }
997
998         return 0;
999 }
1000
1001 static int
1002 lfsck_layout_lastid_store(const struct lu_env *env,
1003                           struct lfsck_component *com)
1004 {
1005         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1006         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1007         struct dt_device                *dt     = lfsck->li_bottom;
1008         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1009         struct lfsck_layout_seq         *lls;
1010         struct thandle                  *th;
1011         __u64                            lastid;
1012         int                              rc     = 0;
1013         int                              rc1    = 0;
1014
1015         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1016                 loff_t pos = 0;
1017
1018                 if (!lls->lls_dirty)
1019                         continue;
1020
1021                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1022                        "<seq> "LPX64" as <oid> "LPU64"\n",
1023                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1024
1025                 if (bk->lb_param & LPF_DRYRUN) {
1026                         lls->lls_dirty = 0;
1027                         continue;
1028                 }
1029
1030                 th = dt_trans_create(env, dt);
1031                 if (IS_ERR(th)) {
1032                         rc1 = PTR_ERR(th);
1033                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1034                                "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1035                                lfsck_lfsck2name(com->lc_lfsck),
1036                                lls->lls_seq, rc1);
1037                         continue;
1038                 }
1039
1040                 lastid = cpu_to_le64(lls->lls_lastid);
1041                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1042                                              lfsck_buf_get(env, &lastid,
1043                                                            sizeof(lastid)),
1044                                              pos, th);
1045                 if (rc != 0)
1046                         goto stop;
1047
1048                 rc = dt_trans_start_local(env, dt, th);
1049                 if (rc != 0)
1050                         goto stop;
1051
1052                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1053                 rc = dt_record_write(env, lls->lls_lastid_obj,
1054                                      lfsck_buf_get(env, &lastid,
1055                                      sizeof(lastid)), &pos, th);
1056                 dt_write_unlock(env, lls->lls_lastid_obj);
1057                 if (rc == 0)
1058                         lls->lls_dirty = 0;
1059
1060 stop:
1061                 dt_trans_stop(env, dt, th);
1062                 if (rc != 0) {
1063                         rc1 = rc;
1064                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1065                                "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1066                                lfsck_lfsck2name(com->lc_lfsck),
1067                                lls->lls_seq, rc1);
1068                 }
1069         }
1070
1071         return rc1;
1072 }
1073
1074 static int
1075 lfsck_layout_lastid_load(const struct lu_env *env,
1076                          struct lfsck_component *com,
1077                          struct lfsck_layout_seq *lls)
1078 {
1079         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1080         struct lfsck_layout     *lo     = com->lc_file_ram;
1081         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1082         struct dt_object        *obj;
1083         loff_t                   pos    = 0;
1084         int                      rc;
1085         ENTRY;
1086
1087         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1088         obj = dt_locate(env, lfsck->li_bottom, fid);
1089         if (IS_ERR(obj))
1090                 RETURN(PTR_ERR(obj));
1091
1092         /* LAST_ID crashed, to be rebuilt */
1093         if (dt_object_exists(obj) == 0) {
1094                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1095                         LASSERT(lfsck->li_out_notify != NULL);
1096
1097                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1098                                              LE_LASTID_REBUILDING);
1099                         lo->ll_flags |= LF_CRASHED_LASTID;
1100
1101                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1102                                "LAST_ID file for sequence "LPX64"\n",
1103                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1104
1105                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1106                             cfs_fail_val > 0) {
1107                                 struct l_wait_info lwi = LWI_TIMEOUT(
1108                                                 cfs_time_seconds(cfs_fail_val),
1109                                                 NULL, NULL);
1110
1111                                 up_write(&com->lc_sem);
1112                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1113                                              !thread_is_running(&lfsck->li_thread),
1114                                              &lwi);
1115                                 down_write(&com->lc_sem);
1116                         }
1117                 }
1118
1119                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1120         } else {
1121                 dt_read_lock(env, obj, 0);
1122                 rc = dt_read(env, obj,
1123                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1124                         &pos);
1125                 dt_read_unlock(env, obj);
1126                 if (rc != 0 && rc != sizeof(__u64))
1127                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1128
1129                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1130                         LASSERT(lfsck->li_out_notify != NULL);
1131
1132                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1133                                              LE_LASTID_REBUILDING);
1134                         lo->ll_flags |= LF_CRASHED_LASTID;
1135
1136                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1137                                "LAST_ID file for the sequence "LPX64
1138                                ": rc = %d\n",
1139                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1140                 }
1141
1142                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1143                 rc = 0;
1144         }
1145
1146         GOTO(out, rc);
1147
1148 out:
1149         if (rc != 0)
1150                 lfsck_object_put(env, obj);
1151         else
1152                 lls->lls_lastid_obj = obj;
1153
1154         return rc;
1155 }
1156
1157 static void lfsck_layout_record_failure(const struct lu_env *env,
1158                                                  struct lfsck_instance *lfsck,
1159                                                  struct lfsck_layout *lo)
1160 {
1161         lo->ll_objs_failed_phase1++;
1162         if (unlikely(lo->ll_pos_first_inconsistent == 0)) {
1163                 lo->ll_pos_first_inconsistent =
1164                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1165                                                         lfsck->li_di_oit);
1166
1167                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1168                        "inconsistency at the pos ["LPU64"]\n",
1169                        lfsck_lfsck2name(lfsck),
1170                        lo->ll_pos_first_inconsistent);
1171         }
1172 }
1173
1174 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1175                                                struct ptlrpc_request *req,
1176                                                void *args, int rc)
1177 {
1178         struct lfsck_async_interpret_args *laia = args;
1179         struct lfsck_component            *com  = laia->laia_com;
1180         struct lfsck_layout_master_data   *llmd = com->lc_data;
1181         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1182         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1183         struct lfsck_request              *lr   = laia->laia_lr;
1184
1185         switch (lr->lr_event) {
1186         case LE_START:
1187                 if (rc != 0) {
1188                         struct lfsck_layout *lo = com->lc_file_ram;
1189
1190                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout "
1191                                "start: rc = %d\n",
1192                                lfsck_lfsck2name(com->lc_lfsck),
1193                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1194                                ltd->ltd_index, rc);
1195                         lo->ll_flags |= LF_INCOMPLETE;
1196                         break;
1197                 }
1198
1199                 spin_lock(&ltds->ltd_lock);
1200                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1201                         spin_unlock(&ltds->ltd_lock);
1202                         break;
1203                 }
1204
1205                 if (lr->lr_flags & LEF_TO_OST) {
1206                         if (list_empty(&ltd->ltd_layout_list))
1207                                 list_add_tail(&ltd->ltd_layout_list,
1208                                               &llmd->llmd_ost_list);
1209                         if (list_empty(&ltd->ltd_layout_phase_list))
1210                                 list_add_tail(&ltd->ltd_layout_phase_list,
1211                                               &llmd->llmd_ost_phase1_list);
1212                 } else {
1213                         if (list_empty(&ltd->ltd_layout_list))
1214                                 list_add_tail(&ltd->ltd_layout_list,
1215                                               &llmd->llmd_mdt_list);
1216                         if (list_empty(&ltd->ltd_layout_phase_list))
1217                                 list_add_tail(&ltd->ltd_layout_phase_list,
1218                                               &llmd->llmd_mdt_phase1_list);
1219                 }
1220                 spin_unlock(&ltds->ltd_lock);
1221                 break;
1222         case LE_STOP:
1223         case LE_PHASE1_DONE:
1224         case LE_PHASE2_DONE:
1225         case LE_PEER_EXIT:
1226                 if (rc != 0 && rc != -EALREADY)
1227                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout: "
1228                                "event = %d, rc = %d\n",
1229                                lfsck_lfsck2name(com->lc_lfsck),
1230                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1231                                ltd->ltd_index, lr->lr_event, rc);
1232                 break;
1233         case LE_QUERY: {
1234                 struct lfsck_reply *reply;
1235
1236                 if (rc != 0) {
1237                         spin_lock(&ltds->ltd_lock);
1238                         list_del_init(&ltd->ltd_layout_phase_list);
1239                         list_del_init(&ltd->ltd_layout_list);
1240                         spin_unlock(&ltds->ltd_lock);
1241                         break;
1242                 }
1243
1244                 reply = req_capsule_server_get(&req->rq_pill,
1245                                                &RMF_LFSCK_REPLY);
1246                 if (reply == NULL) {
1247                         rc = -EPROTO;
1248                         CDEBUG(D_LFSCK, "%s:  invalid query reply: rc = %d\n",
1249                                lfsck_lfsck2name(com->lc_lfsck), rc);
1250                         spin_lock(&ltds->ltd_lock);
1251                         list_del_init(&ltd->ltd_layout_phase_list);
1252                         list_del_init(&ltd->ltd_layout_list);
1253                         spin_unlock(&ltds->ltd_lock);
1254                         break;
1255                 }
1256
1257                 switch (reply->lr_status) {
1258                 case LS_SCANNING_PHASE1:
1259                         break;
1260                 case LS_SCANNING_PHASE2:
1261                         spin_lock(&ltds->ltd_lock);
1262                         list_del_init(&ltd->ltd_layout_phase_list);
1263                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1264                                 spin_unlock(&ltds->ltd_lock);
1265                                 break;
1266                         }
1267
1268                         if (lr->lr_flags & LEF_TO_OST)
1269                                 list_add_tail(&ltd->ltd_layout_phase_list,
1270                                               &llmd->llmd_ost_phase2_list);
1271                         else
1272                                 list_add_tail(&ltd->ltd_layout_phase_list,
1273                                               &llmd->llmd_mdt_phase2_list);
1274                         spin_unlock(&ltds->ltd_lock);
1275                         break;
1276                 default:
1277                         spin_lock(&ltds->ltd_lock);
1278                         list_del_init(&ltd->ltd_layout_phase_list);
1279                         list_del_init(&ltd->ltd_layout_list);
1280                         spin_unlock(&ltds->ltd_lock);
1281                         break;
1282                 }
1283                 break;
1284         }
1285         default:
1286                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1287                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1288                 break;
1289         }
1290
1291         if (!laia->laia_shared) {
1292                 lfsck_tgt_put(ltd);
1293                 lfsck_component_put(env, com);
1294         }
1295
1296         return 0;
1297 }
1298
1299 static int lfsck_layout_master_query_others(const struct lu_env *env,
1300                                             struct lfsck_component *com)
1301 {
1302         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1303         struct lfsck_request              *lr    = &info->lti_lr;
1304         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1305         struct lfsck_instance             *lfsck = com->lc_lfsck;
1306         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1307         struct ptlrpc_request_set         *set;
1308         struct lfsck_tgt_descs            *ltds;
1309         struct lfsck_tgt_desc             *ltd;
1310         struct list_head                  *head;
1311         int                                rc    = 0;
1312         int                                rc1   = 0;
1313         ENTRY;
1314
1315         set = ptlrpc_prep_set();
1316         if (set == NULL)
1317                 RETURN(-ENOMEM);
1318
1319         llmd->llmd_touch_gen++;
1320         memset(lr, 0, sizeof(*lr));
1321         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1322         lr->lr_event = LE_QUERY;
1323         lr->lr_active = LFSCK_TYPE_LAYOUT;
1324         laia->laia_com = com;
1325         laia->laia_lr = lr;
1326         laia->laia_shared = 0;
1327
1328         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1329                 ltds = &lfsck->li_mdt_descs;
1330                 lr->lr_flags = 0;
1331                 head = &llmd->llmd_mdt_phase1_list;
1332         } else {
1333
1334 again:
1335                 ltds = &lfsck->li_ost_descs;
1336                 lr->lr_flags = LEF_TO_OST;
1337                 head = &llmd->llmd_ost_phase1_list;
1338         }
1339
1340         laia->laia_ltds = ltds;
1341         spin_lock(&ltds->ltd_lock);
1342         while (!list_empty(head)) {
1343                 ltd = list_entry(head->next,
1344                                  struct lfsck_tgt_desc,
1345                                  ltd_layout_phase_list);
1346                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1347                         break;
1348
1349                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1350                 list_move_tail(&ltd->ltd_layout_phase_list, head);
1351                 atomic_inc(&ltd->ltd_ref);
1352                 laia->laia_ltd = ltd;
1353                 spin_unlock(&ltds->ltd_lock);
1354                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1355                                          lfsck_layout_master_async_interpret,
1356                                          laia, LFSCK_QUERY);
1357                 if (rc != 0) {
1358                         CDEBUG(D_LFSCK, "%s: layout LFSCK fail to query %s %x: "
1359                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1360                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1361                                ltd->ltd_index, rc);
1362                         lfsck_tgt_put(ltd);
1363                         rc1 = rc;
1364                 }
1365                 spin_lock(&ltds->ltd_lock);
1366         }
1367         spin_unlock(&ltds->ltd_lock);
1368
1369         rc = ptlrpc_set_wait(set);
1370         if (rc < 0) {
1371                 ptlrpc_set_destroy(set);
1372                 RETURN(rc);
1373         }
1374
1375         if (!(lr->lr_flags & LEF_TO_OST) &&
1376             list_empty(&llmd->llmd_mdt_phase1_list))
1377                 goto again;
1378
1379         ptlrpc_set_destroy(set);
1380
1381         RETURN(rc1 != 0 ? rc1 : rc);
1382 }
1383
1384 static inline bool
1385 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1386 {
1387         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1388                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1389                 list_empty(&llmd->llmd_ost_phase1_list));
1390 }
1391
1392 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1393                                              struct lfsck_component *com,
1394                                              struct lfsck_request *lr)
1395 {
1396         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1397         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1398         struct lfsck_instance             *lfsck = com->lc_lfsck;
1399         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1400         struct lfsck_layout               *lo    = com->lc_file_ram;
1401         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1402         struct ptlrpc_request_set         *set;
1403         struct lfsck_tgt_descs            *ltds;
1404         struct lfsck_tgt_desc             *ltd;
1405         struct lfsck_tgt_desc             *next;
1406         struct list_head                  *head;
1407         __u32                              idx;
1408         int                                rc    = 0;
1409         ENTRY;
1410
1411         set = ptlrpc_prep_set();
1412         if (set == NULL)
1413                 RETURN(-ENOMEM);
1414
1415         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1416         lr->lr_active = LFSCK_TYPE_LAYOUT;
1417         laia->laia_com = com;
1418         laia->laia_lr = lr;
1419         laia->laia_shared = 0;
1420         switch (lr->lr_event) {
1421         case LE_START:
1422                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1423                 ltds = &lfsck->li_ost_descs;
1424                 laia->laia_ltds = ltds;
1425                 down_read(&ltds->ltd_rw_sem);
1426                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1427                         ltd = lfsck_tgt_get(ltds, idx);
1428                         LASSERT(ltd != NULL);
1429
1430                         laia->laia_ltd = ltd;
1431                         ltd->ltd_layout_done = 0;
1432                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1433                                         lfsck_layout_master_async_interpret,
1434                                         laia, LFSCK_NOTIFY);
1435                         if (rc != 0) {
1436                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1437                                        "notify %s %x for start: rc = %d\n",
1438                                        lfsck_lfsck2name(lfsck),
1439                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1440                                        "MDT", idx, rc);
1441                                 lfsck_tgt_put(ltd);
1442                                 lo->ll_flags |= LF_INCOMPLETE;
1443                         }
1444                 }
1445                 up_read(&ltds->ltd_rw_sem);
1446
1447                 /* Sync up */
1448                 rc = ptlrpc_set_wait(set);
1449                 if (rc < 0) {
1450                         ptlrpc_set_destroy(set);
1451                         RETURN(rc);
1452                 }
1453
1454                 if (!(bk->lb_param & LPF_ALL_TGT))
1455                         break;
1456
1457                 /* link other MDT targets locallly. */
1458                 ltds = &lfsck->li_mdt_descs;
1459                 spin_lock(&ltds->ltd_lock);
1460                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1461                         ltd = LTD_TGT(ltds, idx);
1462                         LASSERT(ltd != NULL);
1463
1464                         if (!list_empty(&ltd->ltd_layout_list))
1465                                 continue;
1466
1467                         list_add_tail(&ltd->ltd_layout_list,
1468                                       &llmd->llmd_mdt_list);
1469                         list_add_tail(&ltd->ltd_layout_phase_list,
1470                                       &llmd->llmd_mdt_phase1_list);
1471                 }
1472                 spin_unlock(&ltds->ltd_lock);
1473                 break;
1474         case LE_STOP:
1475         case LE_PHASE2_DONE:
1476         case LE_PEER_EXIT: {
1477                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1478                 if (bk->lb_param & LPF_ALL_TGT) {
1479                         head = &llmd->llmd_mdt_list;
1480                         ltds = &lfsck->li_mdt_descs;
1481                         if (lr->lr_event == LE_STOP) {
1482                                 /* unlink other MDT targets locallly. */
1483                                 spin_lock(&ltds->ltd_lock);
1484                                 list_for_each_entry_safe(ltd, next, head,
1485                                                          ltd_layout_list) {
1486                                         list_del_init(&ltd->ltd_layout_phase_list);
1487                                         list_del_init(&ltd->ltd_layout_list);
1488                                 }
1489                                 spin_unlock(&ltds->ltd_lock);
1490
1491                                 lr->lr_flags |= LEF_TO_OST;
1492                                 head = &llmd->llmd_ost_list;
1493                                 ltds = &lfsck->li_ost_descs;
1494                         } else {
1495                                 lr->lr_flags &= ~LEF_TO_OST;
1496                         }
1497                 } else {
1498                         lr->lr_flags |= LEF_TO_OST;
1499                         head = &llmd->llmd_ost_list;
1500                         ltds = &lfsck->li_ost_descs;
1501                 }
1502
1503 again:
1504                 laia->laia_ltds = ltds;
1505                 spin_lock(&ltds->ltd_lock);
1506                 while (!list_empty(head)) {
1507                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1508                                          ltd_layout_list);
1509                         if (!list_empty(&ltd->ltd_layout_phase_list))
1510                                 list_del_init(&ltd->ltd_layout_phase_list);
1511                         list_del_init(&ltd->ltd_layout_list);
1512                         atomic_inc(&ltd->ltd_ref);
1513                         laia->laia_ltd = ltd;
1514                         spin_unlock(&ltds->ltd_lock);
1515                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1516                                         lfsck_layout_master_async_interpret,
1517                                         laia, LFSCK_NOTIFY);
1518                         if (rc != 0) {
1519                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1520                                        "notify %s %x for stop/phase2_done/"
1521                                        "peer_exit: rc = %d\n",
1522                                        lfsck_lfsck2name(lfsck),
1523                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1524                                        "MDT", ltd->ltd_index, rc);
1525                                 lfsck_tgt_put(ltd);
1526                         }
1527                         spin_lock(&ltds->ltd_lock);
1528                 }
1529                 spin_unlock(&ltds->ltd_lock);
1530
1531                 rc = ptlrpc_set_wait(set);
1532                 if (rc < 0) {
1533                         ptlrpc_set_destroy(set);
1534                         RETURN(rc);
1535                 }
1536
1537                 if (!(lr->lr_flags & LEF_TO_OST)) {
1538                         lr->lr_flags |= LEF_TO_OST;
1539                         head = &llmd->llmd_ost_list;
1540                         ltds = &lfsck->li_ost_descs;
1541                         goto again;
1542                 }
1543                 break;
1544         }
1545         case LE_PHASE1_DONE:
1546                 llmd->llmd_touch_gen++;
1547                 ltds = &lfsck->li_mdt_descs;
1548                 laia->laia_ltds = ltds;
1549                 spin_lock(&ltds->ltd_lock);
1550                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1551                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1552                                          struct lfsck_tgt_desc,
1553                                          ltd_layout_phase_list);
1554                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1555                                 break;
1556
1557                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1558                         list_move_tail(&ltd->ltd_layout_phase_list,
1559                                        &llmd->llmd_mdt_phase1_list);
1560                         atomic_inc(&ltd->ltd_ref);
1561                         laia->laia_ltd = ltd;
1562                         spin_unlock(&ltds->ltd_lock);
1563                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1564                                         lfsck_layout_master_async_interpret,
1565                                         laia, LFSCK_NOTIFY);
1566                         if (rc != 0) {
1567                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1568                                        "notify MDT %x for phase1_done: "
1569                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1570                                        ltd->ltd_index, rc);
1571                                 lfsck_tgt_put(ltd);
1572                         }
1573                         spin_lock(&ltds->ltd_lock);
1574                 }
1575                 spin_unlock(&ltds->ltd_lock);
1576                 break;
1577         default:
1578                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1579                        lfsck_lfsck2name(lfsck), lr->lr_event);
1580                 rc = -EINVAL;
1581                 break;
1582         }
1583
1584         rc = ptlrpc_set_wait(set);
1585         ptlrpc_set_destroy(set);
1586
1587         RETURN(rc);
1588 }
1589
1590 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1591                                            struct lfsck_component *com,
1592                                            int rc)
1593 {
1594         struct lfsck_instance   *lfsck = com->lc_lfsck;
1595         struct lfsck_layout     *lo    = com->lc_file_ram;
1596         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1597
1598         down_write(&com->lc_sem);
1599         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1600                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1601         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1602         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1603
1604         if (rc > 0) {
1605                 com->lc_journal = 0;
1606                 if (lo->ll_flags & LF_INCOMPLETE)
1607                         lo->ll_status = LS_PARTIAL;
1608                 else
1609                         lo->ll_status = LS_COMPLETED;
1610                 if (!(bk->lb_param & LPF_DRYRUN))
1611                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1612                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1613                 lo->ll_success_count++;
1614         } else if (rc == 0) {
1615                 lo->ll_status = lfsck->li_status;
1616                 if (lo->ll_status == 0)
1617                         lo->ll_status = LS_STOPPED;
1618         } else {
1619                 lo->ll_status = LS_FAILED;
1620         }
1621
1622         rc = lfsck_layout_store(env, com);
1623         up_write(&com->lc_sem);
1624
1625         return rc;
1626 }
1627
1628 static int lfsck_layout_trans_stop(const struct lu_env *env,
1629                                    struct dt_device *dev,
1630                                    struct thandle *handle, int result)
1631 {
1632         int rc;
1633
1634         handle->th_result = result;
1635         rc = dt_trans_stop(env, dev, handle);
1636         if (rc > 0)
1637                 rc = 0;
1638         else if (rc == 0)
1639                 rc = 1;
1640
1641         return rc;
1642 }
1643
1644 /**
1645  * Get the system default stripe size.
1646  *
1647  * \param[in] env       pointer to the thread context
1648  * \param[in] lfsck     pointer to the lfsck instance
1649  * \param[out] size     pointer to the default stripe size
1650  *
1651  * \retval              0 for success
1652  * \retval              negative error number on failure
1653  */
1654 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1655                                            struct lfsck_instance *lfsck,
1656                                            __u32 *size)
1657 {
1658         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1659         struct dt_object        *root;
1660         int                      rc;
1661
1662         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1663         if (IS_ERR(root))
1664                 return PTR_ERR(root);
1665
1666         /* Get the default stripe size via xattr_get on the backend root. */
1667         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1668                           XATTR_NAME_LOV, BYPASS_CAPA);
1669         if (rc > 0) {
1670                 /* The lum->lmm_stripe_size is LE mode. The *size also
1671                  * should be LE mode. So it is unnecessary to convert. */
1672                 *size = lum->lmm_stripe_size;
1673                 rc = 0;
1674         } else if (unlikely(rc == 0)) {
1675                 rc = -EINVAL;
1676         }
1677
1678         lfsck_object_put(env, root);
1679
1680         return rc;
1681 }
1682
1683 /**
1684  * \retval       +1: repaired
1685  * \retval        0: did nothing
1686  * \retval      -ve: on error
1687  */
1688 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1689                                      struct thandle *handle,
1690                                      struct dt_object *parent,
1691                                      struct lu_fid *cfid,
1692                                      struct lu_buf *buf,
1693                                      struct lov_ost_data_v1 *slot,
1694                                      int fl, __u32 ost_idx)
1695 {
1696         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1697         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1698         struct lu_buf            ea_buf;
1699         int                      rc;
1700         __u32                    magic;
1701         __u16                    count;
1702
1703         magic = le32_to_cpu(lmm->lmm_magic);
1704         count = le16_to_cpu(lmm->lmm_stripe_count);
1705
1706         fid_to_ostid(cfid, oi);
1707         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1708         slot->l_ost_gen = cpu_to_le32(0);
1709         slot->l_ost_idx = cpu_to_le32(ost_idx);
1710
1711         if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1712                 struct lov_ost_data_v1 *objs;
1713                 int                     i;
1714
1715                 if (magic == LOV_MAGIC_V1)
1716                         objs = &lmm->lmm_objects[0];
1717                 else
1718                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1719                 for (i = 0; i < count; i++, objs++) {
1720                         if (objs != slot && lovea_slot_is_dummy(objs))
1721                                 break;
1722                 }
1723
1724                 /* If the @slot is the last dummy slot to be refilled,
1725                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1726                 if (i == count)
1727                         lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1728         }
1729
1730         lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1731         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1732                           BYPASS_CAPA);
1733         if (rc == 0)
1734                 rc = 1;
1735
1736         return rc;
1737 }
1738
1739 /**
1740  * \retval       +1: repaired
1741  * \retval        0: did nothing
1742  * \retval      -ve: on error
1743  */
1744 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1745                                      struct lfsck_instance *lfsck,
1746                                      struct thandle *handle,
1747                                      struct dt_object *parent,
1748                                      struct lu_fid *cfid,
1749                                      struct lu_buf *buf, int fl,
1750                                      __u32 ost_idx, __u32 ea_off, bool reset)
1751 {
1752         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1753         struct lov_ost_data_v1  *objs;
1754         int                      rc;
1755         __u16                    count;
1756         bool                     hole   = false;
1757         ENTRY;
1758
1759         if (fl == LU_XATTR_CREATE || reset) {
1760                 __u32 pattern = LOV_PATTERN_RAID0;
1761
1762                 count = ea_off + 1;
1763                 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1764
1765                 if (ea_off != 0 || reset) {
1766                         pattern |= LOV_PATTERN_F_HOLE;
1767                         hole = true;
1768                 }
1769
1770                 memset(lmm, 0, buf->lb_len);
1771                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1772                 lmm->lmm_pattern = cpu_to_le32(pattern);
1773                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1774                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1775
1776                 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1777                                                      &lmm->lmm_stripe_size);
1778                 if (rc != 0)
1779                         RETURN(rc);
1780
1781                 objs = &lmm->lmm_objects[ea_off];
1782         } else {
1783                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1784                 int     gap;
1785
1786                 count = le16_to_cpu(lmm->lmm_stripe_count);
1787                 if (magic == LOV_MAGIC_V1)
1788                         objs = &lmm->lmm_objects[count];
1789                 else
1790                         objs = &((struct lov_mds_md_v3 *)lmm)->
1791                                                         lmm_objects[count];
1792
1793                 gap = ea_off - count;
1794                 if (gap >= 0)
1795                         count = ea_off + 1;
1796                 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1797
1798                 if (gap > 0) {
1799                         memset(objs, 0, gap * sizeof(*objs));
1800                         lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1801                         hole = true;
1802                 }
1803
1804                 lmm->lmm_layout_gen =
1805                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1806                 objs += gap;
1807         }
1808
1809         lmm->lmm_stripe_count = cpu_to_le16(count);
1810         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1811                                        fl, ost_idx);
1812
1813         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1814                DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1815                "reset %s, %s LOV EA hole: rc = %d\n",
1816                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1817                ost_idx, ea_off, fl, reset ? "yes" : "no",
1818                hole ? "with" : "without", rc);
1819
1820         RETURN(rc);
1821 }
1822
1823 /**
1824  * \retval       +1: repaired
1825  * \retval        0: did nothing
1826  * \retval      -ve: on error
1827  */
1828 static int lfsck_layout_update_pfid(const struct lu_env *env,
1829                                     struct lfsck_component *com,
1830                                     struct dt_object *parent,
1831                                     struct lu_fid *cfid,
1832                                     struct dt_device *cdev, __u32 ea_off)
1833 {
1834         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1835         struct dt_object        *child;
1836         struct thandle          *handle;
1837         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1838         struct lu_buf           *buf;
1839         int                      rc     = 0;
1840         ENTRY;
1841
1842         child = lfsck_object_find_by_dev(env, cdev, cfid);
1843         if (IS_ERR(child))
1844                 RETURN(PTR_ERR(child));
1845
1846         handle = dt_trans_create(env, cdev);
1847         if (IS_ERR(handle))
1848                 GOTO(out, rc = PTR_ERR(handle));
1849
1850         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1851         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1852         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1853          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1854          * parent MDT-object's layout EA. */
1855         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1856         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1857
1858         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1859         if (rc != 0)
1860                 GOTO(stop, rc);
1861
1862         rc = dt_trans_start(env, cdev, handle);
1863         if (rc != 0)
1864                 GOTO(stop, rc);
1865
1866         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1867                           BYPASS_CAPA);
1868
1869         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1870
1871 stop:
1872         dt_trans_stop(env, cdev, handle);
1873
1874 out:
1875         lu_object_put(env, &child->do_lu);
1876
1877         return rc;
1878 }
1879
1880 /**
1881  * This function will create the MDT-object with the given (partial) LOV EA.
1882  *
1883  * Under some data corruption cases, the MDT-object of the file may be lost,
1884  * but its OST-objects, or some of them are there. The layout LFSCK needs to
1885  * re-create the MDT-object with the orphan OST-object(s) information.
1886  *
1887  * On the other hand, the LFSCK may has created some OST-object for repairing
1888  * dangling LOV EA reference, but as the LFSCK processing, it may find that
1889  * the old OST-object is there and should replace the former new created OST
1890  * object. Unfortunately, some others have modified such newly created object.
1891  * To keep the data (both new and old), the LFSCK will create MDT-object with
1892  * new FID to reference the original OST-object.
1893  *
1894  * \param[in] env       pointer to the thread context
1895  * \param[in] com       pointer to the lfsck component
1896  * \param[in] ltd       pointer to target device descriptor
1897  * \param[in] rec       pointer to the record for the orphan OST-object
1898  * \param[in] cfid      pointer to FID for the orphan OST-object
1899  * \param[in] infix     additional information, such as the FID for original
1900  *                      MDT-object and the stripe offset in the LOV EA
1901  * \param[in] type      the type for describing why the orphan MDT-object is
1902  *                      created. The rules are as following:
1903  *
1904  *  type "C":           Multiple OST-objects claim the same MDT-object and the
1905  *                      same slot in the layout EA. Then the LFSCK will create
1906  *                      new MDT-object(s) to hold the conflict OST-object(s).
1907  *
1908  *  type "N":           The orphan OST-object does not know which one was the
1909  *                      real parent MDT-object, so the LFSCK uses new FID for
1910  *                      its parent MDT-object.
1911  *
1912  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
1913  *                      but does not know the position (the file name) in the
1914  *                      namespace.
1915  *
1916  * The orphan name will be like:
1917  * ${FID}-${infix}-${type}-${conflict_version}
1918  *
1919  * \param[in] ea_off    the stripe offset in the LOV EA
1920  *
1921  * \retval              positive on repaired something
1922  * \retval              0 if needs to repair nothing
1923  * \retval              negative error number on failure
1924  */
1925 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1926                                         struct lfsck_component *com,
1927                                         struct lfsck_tgt_desc *ltd,
1928                                         struct lu_orphan_rec *rec,
1929                                         struct lu_fid *cfid,
1930                                         const char *infix,
1931                                         const char *type,
1932                                         __u32 ea_off)
1933 {
1934         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1935         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
1936         char                            *name   = info->lti_key;
1937         struct lu_attr                  *la     = &info->lti_la;
1938         struct dt_object_format         *dof    = &info->lti_dof;
1939         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1940         struct lu_fid                   *pfid   = &rec->lor_fid;
1941         struct lu_fid                   *tfid   = &info->lti_fid3;
1942         struct dt_device                *next   = lfsck->li_next;
1943         struct dt_object                *pobj   = NULL;
1944         struct dt_object                *cobj   = NULL;
1945         struct thandle                  *th     = NULL;
1946         struct lu_buf                    pbuf   = { 0 };
1947         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1948         struct lu_buf                    lov_buf;
1949         struct lustre_handle             lh     = { 0 };
1950         struct linkea_data               ldata  = { 0 };
1951         struct lu_buf                    linkea_buf;
1952         const struct lu_name            *pname;
1953         int                              size   = 0;
1954         int                              idx    = 0;
1955         int                              rc     = 0;
1956         ENTRY;
1957
1958         /* Create .lustre/lost+found/MDTxxxx when needed. */
1959         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1960                 rc = lfsck_create_lpf(env, lfsck);
1961                 if (rc != 0)
1962                         GOTO(log, rc);
1963         }
1964
1965         if (fid_is_zero(pfid)) {
1966                 struct filter_fid *ff = &info->lti_new_pfid;
1967
1968                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
1969                 if (rc != 0)
1970                         RETURN(rc);
1971
1972                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
1973                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
1974                 /* Currently, the filter_fid::ff_parent::f_ver is not the
1975                  * real parent MDT-object's FID::f_ver, instead it is the
1976                  * OST-object index in its parent MDT-object's layout EA. */
1977                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1978                 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
1979                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
1980                 if (IS_ERR(cobj))
1981                         GOTO(log, rc = PTR_ERR(cobj));
1982         }
1983
1984         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
1985         if (IS_ERR(pobj))
1986                 GOTO(put, rc = PTR_ERR(pobj));
1987
1988         LASSERT(infix != NULL);
1989         LASSERT(type != NULL);
1990
1991         do {
1992                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
1993                          type, idx++);
1994                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
1995                                (const struct dt_key *)name, BYPASS_CAPA);
1996                 if (rc != 0 && rc != -ENOENT)
1997                         GOTO(put, rc);
1998         } while (rc == 0);
1999
2000         rc = linkea_data_new(&ldata,
2001                              &lfsck_env_info(env)->lti_linkea_buf);
2002         if (rc != 0)
2003                 GOTO(put, rc);
2004
2005         pname = lfsck_name_get_const(env, name, strlen(name));
2006         rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
2007         if (rc != 0)
2008                 GOTO(put, rc);
2009
2010         memset(la, 0, sizeof(*la));
2011         la->la_uid = rec->lor_uid;
2012         la->la_gid = rec->lor_gid;
2013         la->la_mode = S_IFREG | S_IRUSR;
2014         la->la_valid = LA_MODE | LA_UID | LA_GID;
2015
2016         memset(dof, 0, sizeof(*dof));
2017         dof->dof_type = dt_mode_to_dft(S_IFREG);
2018
2019         size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2020         if (ea_buf->lb_len < size) {
2021                 lu_buf_realloc(ea_buf, size);
2022                 if (ea_buf->lb_buf == NULL)
2023                         GOTO(put, rc = -ENOMEM);
2024         }
2025
2026         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
2027          *
2028          * XXX: Currently, we do not grab the PDO lock as normal create cases,
2029          *      because creating MDT-object for orphan OST-object is rare, we
2030          *      do not much care about the performance. It can be improved in
2031          *      the future when needed. */
2032         rc = lfsck_ibits_lock(env, lfsck, lfsck->li_lpf_obj, &lh,
2033                               MDS_INODELOCK_UPDATE, LCK_EX);
2034         if (rc != 0)
2035                 GOTO(put, rc);
2036
2037         th = dt_trans_create(env, next);
2038         if (IS_ERR(th))
2039                 GOTO(unlock, rc = PTR_ERR(th));
2040
2041         /* 1a. Update OST-object's parent information remotely.
2042          *
2043          * If other subsequent modifications failed, then next LFSCK scanning
2044          * will process the OST-object as orphan again with known parent FID. */
2045         if (cobj != NULL) {
2046                 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
2047                                           0, th);
2048                 if (rc != 0)
2049                         GOTO(stop, rc);
2050         }
2051
2052         /* 2a. Create the MDT-object locally. */
2053         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2054         if (rc != 0)
2055                 GOTO(stop, rc);
2056
2057         /* 3a. Add layout EA for the MDT-object. */
2058         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
2059         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
2060                                   LU_XATTR_CREATE, th);
2061         if (rc != 0)
2062                 GOTO(stop, rc);
2063
2064         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2065         dtrec->rec_fid = pfid;
2066         dtrec->rec_type = S_IFREG;
2067         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
2068                                (const struct dt_rec *)dtrec,
2069                                (const struct dt_key *)name, th);
2070         if (rc != 0)
2071                 GOTO(stop, rc);
2072
2073         /* 5a. insert linkEA for parent. */
2074         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2075                        ldata.ld_leh->leh_len);
2076         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
2077                                   XATTR_NAME_LINK, 0, th);
2078         if (rc != 0)
2079                 GOTO(stop, rc);
2080
2081         rc = dt_trans_start(env, next, th);
2082         if (rc != 0)
2083                 GOTO(stop, rc);
2084
2085         /* 1b. Update OST-object's parent information remotely. */
2086         if (cobj != NULL) {
2087                 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
2088                                   BYPASS_CAPA);
2089                 if (rc != 0)
2090                         GOTO(stop, rc);
2091         }
2092
2093         dt_write_lock(env, pobj, 0);
2094         /* 2b. Create the MDT-object locally. */
2095         rc = dt_create(env, pobj, la, NULL, dof, th);
2096         if (rc == 0)
2097                 /* 3b. Add layout EA for the MDT-object. */
2098                 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
2099                                                &lov_buf, LU_XATTR_CREATE,
2100                                                ltd->ltd_index, ea_off, false);
2101         dt_write_unlock(env, pobj);
2102         if (rc < 0)
2103                 GOTO(stop, rc);
2104
2105         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2106         rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
2107                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2108         if (rc != 0)
2109                 GOTO(stop, rc);
2110
2111         /* 5b. insert linkEA for parent. */
2112         rc = dt_xattr_set(env, pobj, &linkea_buf,
2113                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2114
2115         GOTO(stop, rc);
2116
2117 stop:
2118         dt_trans_stop(env, next, th);
2119
2120 unlock:
2121         lfsck_ibits_unlock(&lh, LCK_EX);
2122
2123 put:
2124         if (cobj != NULL && !IS_ERR(cobj))
2125                 lu_object_put(env, &cobj->do_lu);
2126         if (pobj != NULL && !IS_ERR(pobj))
2127                 lu_object_put(env, &pobj->do_lu);
2128
2129 log:
2130         if (rc < 0)
2131                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2132                        "recreate the lost MDT-object: parent "DFID
2133                        ", child "DFID", OST-index %u, stripe-index %u, "
2134                        "infix %s, type %s: rc = %d\n",
2135                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2136                        ltd->ltd_index, ea_off, infix, type, rc);
2137
2138         return rc >= 0 ? 1 : rc;
2139 }
2140
2141 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2142                                                    struct lfsck_component *com,
2143                                                    const struct lu_fid *fid,
2144                                                    __u32 index)
2145 {
2146         struct lfsck_thread_info *info  = lfsck_env_info(env);
2147         struct lfsck_request     *lr    = &info->lti_lr;
2148         struct lfsck_instance    *lfsck = com->lc_lfsck;
2149         struct lfsck_tgt_desc    *ltd;
2150         struct ptlrpc_request    *req;
2151         struct lfsck_request     *tmp;
2152         struct obd_export        *exp;
2153         int                       rc    = 0;
2154         ENTRY;
2155
2156         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2157         if (unlikely(ltd == NULL))
2158                 RETURN(-ENXIO);
2159
2160         exp = ltd->ltd_exp;
2161         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2162                 GOTO(put, rc = -EOPNOTSUPP);
2163
2164         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2165         if (req == NULL)
2166                 GOTO(put, rc = -ENOMEM);
2167
2168         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2169         if (rc != 0) {
2170                 ptlrpc_request_free(req);
2171
2172                 GOTO(put, rc);
2173         }
2174
2175         memset(lr, 0, sizeof(*lr));
2176         lr->lr_event = LE_CONDITIONAL_DESTROY;
2177         lr->lr_active = LFSCK_TYPE_LAYOUT;
2178         lr->lr_fid = *fid;
2179
2180         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2181         *tmp = *lr;
2182         ptlrpc_request_set_replen(req);
2183
2184         rc = ptlrpc_queue_wait(req);
2185         ptlrpc_req_finished(req);
2186
2187         GOTO(put, rc);
2188
2189 put:
2190         lfsck_tgt_put(ltd);
2191
2192         return rc;
2193 }
2194
2195 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2196                                                   struct lfsck_component *com,
2197                                                   struct lfsck_request *lr)
2198 {
2199         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2200         struct lu_attr                  *la     = &info->lti_la;
2201         ldlm_policy_data_t              *policy = &info->lti_policy;
2202         struct ldlm_res_id              *resid  = &info->lti_resid;
2203         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2204         struct dt_device                *dev    = lfsck->li_bottom;
2205         struct lu_fid                   *fid    = &lr->lr_fid;
2206         struct dt_object                *obj;
2207         struct thandle                  *th     = NULL;
2208         struct lustre_handle             lh     = { 0 };
2209         __u64                            flags  = 0;
2210         int                              rc     = 0;
2211         ENTRY;
2212
2213         obj = lfsck_object_find_by_dev(env, dev, fid);
2214         if (IS_ERR(obj))
2215                 RETURN(PTR_ERR(obj));
2216
2217         dt_read_lock(env, obj, 0);
2218         if (dt_object_exists(obj) == 0 ||
2219             lfsck_is_dead_obj(obj)) {
2220                 dt_read_unlock(env, obj);
2221
2222                 GOTO(put, rc = -ENOENT);
2223         }
2224
2225         /* Get obj's attr without lock firstly. */
2226         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2227         dt_read_unlock(env, obj);
2228         if (rc != 0)
2229                 GOTO(put, rc);
2230
2231         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2232                 GOTO(put, rc = -ETXTBSY);
2233
2234         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2235         LASSERT(lfsck->li_namespace != NULL);
2236
2237         memset(policy, 0, sizeof(*policy));
2238         policy->l_extent.end = OBD_OBJECT_EOF;
2239         ost_fid_build_resid(fid, resid);
2240         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2241                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2242                                     ldlm_completion_ast, NULL, NULL, 0,
2243                                     LVB_T_NONE, NULL, &lh);
2244         if (rc != ELDLM_OK)
2245                 GOTO(put, rc = -EIO);
2246
2247         dt_write_lock(env, obj, 0);
2248         /* Get obj's attr within lock again. */
2249         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2250         if (rc != 0)
2251                 GOTO(unlock, rc);
2252
2253         if (la->la_ctime != 0)
2254                 GOTO(unlock, rc = -ETXTBSY);
2255
2256         th = dt_trans_create(env, dev);
2257         if (IS_ERR(th))
2258                 GOTO(unlock, rc = PTR_ERR(th));
2259
2260         rc = dt_declare_ref_del(env, obj, th);
2261         if (rc != 0)
2262                 GOTO(stop, rc);
2263
2264         rc = dt_declare_destroy(env, obj, th);
2265         if (rc != 0)
2266                 GOTO(stop, rc);
2267
2268         rc = dt_trans_start_local(env, dev, th);
2269         if (rc != 0)
2270                 GOTO(stop, rc);
2271
2272         rc = dt_ref_del(env, obj, th);
2273         if (rc != 0)
2274                 GOTO(stop, rc);
2275
2276         rc = dt_destroy(env, obj, th);
2277         if (rc == 0)
2278                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2279                        "OST-object "DFID" that was created for reparing "
2280                        "dangling referenced case. But the original missed "
2281                        "OST-object is found now.\n",
2282                        lfsck_lfsck2name(lfsck), PFID(fid));
2283
2284         GOTO(stop, rc);
2285
2286 stop:
2287         dt_trans_stop(env, dev, th);
2288
2289 unlock:
2290         dt_write_unlock(env, obj);
2291         ldlm_lock_decref(&lh, LCK_EX);
2292
2293 put:
2294         lu_object_put(env, &obj->do_lu);
2295
2296         return rc;
2297 }
2298
2299 /**
2300  * Some OST-object has occupied the specified layout EA slot.
2301  * Such OST-object may be generated by the LFSCK when repair
2302  * dangling referenced MDT-object, which can be indicated by
2303  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2304  * is true and such OST-object has not been modified yet, we
2305  * will replace it with the orphan OST-object; otherwise the
2306  * LFSCK will create new MDT-object to reference the orphan.
2307  *
2308  * \retval       +1: repaired
2309  * \retval        0: did nothing
2310  * \retval      -ve: on error
2311  */
2312 static int lfsck_layout_conflict_create(const struct lu_env *env,
2313                                         struct lfsck_component *com,
2314                                         struct lfsck_tgt_desc *ltd,
2315                                         struct lu_orphan_rec *rec,
2316                                         struct dt_object *parent,
2317                                         struct lu_fid *cfid,
2318                                         struct lu_buf *ea_buf,
2319                                         struct lov_ost_data_v1 *slot,
2320                                         __u32 ea_off)
2321 {
2322         struct lfsck_thread_info *info          = lfsck_env_info(env);
2323         struct lu_fid            *cfid2         = &info->lti_fid2;
2324         struct ost_id            *oi            = &info->lti_oi;
2325         char                     *infix         = info->lti_tmpbuf;
2326         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2327         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2328         struct thandle           *th            = NULL;
2329         struct lustre_handle      lh            = { 0 };
2330         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2331         int                       rc            = 0;
2332         ENTRY;
2333
2334         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2335         rc = ostid_to_fid(cfid2, oi, ost_idx2);
2336         if (rc != 0)
2337                 GOTO(out, rc);
2338
2339         /* Hold layout lock on the parent to prevent others to access. */
2340         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2341                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2342                               LCK_EX);
2343         if (rc != 0)
2344                 GOTO(out, rc);
2345
2346         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2347
2348         /* If the conflict OST-obejct is not created for fixing dangling
2349          * referenced MDT-object in former LFSCK check/repair, or it has
2350          * been modified by others, then we cannot destroy it. Re-create
2351          * a new MDT-object for the orphan OST-object. */
2352         if (rc == -ETXTBSY) {
2353                 /* No need the layout lock on the original parent. */
2354                 lfsck_ibits_unlock(&lh, LCK_EX);
2355
2356                 fid_zero(&rec->lor_fid);
2357                 snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2358                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2359                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2360                                                   infix, "C", ea_off);
2361
2362                 RETURN(rc);
2363         }
2364
2365         if (rc != 0 && rc != -ENOENT)
2366                 GOTO(unlock, rc);
2367
2368         th = dt_trans_create(env, dev);
2369         if (IS_ERR(th))
2370                 GOTO(unlock, rc = PTR_ERR(th));
2371
2372         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2373                                   LU_XATTR_REPLACE, th);
2374         if (rc != 0)
2375                 GOTO(stop, rc);
2376
2377         rc = dt_trans_start_local(env, dev, th);
2378         if (rc != 0)
2379                 GOTO(stop, rc);
2380
2381         dt_write_lock(env, parent, 0);
2382         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2383         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2384                                        LU_XATTR_REPLACE, ltd->ltd_index);
2385         dt_write_unlock(env, parent);
2386
2387         GOTO(stop, rc);
2388
2389 stop:
2390         dt_trans_stop(env, dev, th);
2391
2392 unlock:
2393         lfsck_ibits_unlock(&lh, LCK_EX);
2394
2395 out:
2396         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2397                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2398                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2399                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2400                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2401                ea_off, rc);
2402
2403         return rc >= 0 ? 1 : rc;
2404 }
2405
2406 /**
2407  * \retval       +1: repaired
2408  * \retval        0: did nothing
2409  * \retval      -ve: on error
2410  */
2411 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2412                                        struct lfsck_component *com,
2413                                        struct lfsck_tgt_desc *ltd,
2414                                        struct lu_orphan_rec *rec,
2415                                        struct dt_object *parent,
2416                                        struct lu_fid *cfid,
2417                                        __u32 ost_idx, __u32 ea_off)
2418 {
2419         struct lfsck_thread_info *info          = lfsck_env_info(env);
2420         struct lu_buf            *buf           = &info->lti_big_buf;
2421         struct lu_fid            *fid           = &info->lti_fid2;
2422         struct ost_id            *oi            = &info->lti_oi;
2423         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2424         struct dt_device         *dt            = lfsck->li_bottom;
2425         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2426         struct thandle            *handle       = NULL;
2427         size_t                    lovea_size;
2428         struct lov_mds_md_v1     *lmm;
2429         struct lov_ost_data_v1   *objs;
2430         struct lustre_handle      lh            = { 0 };
2431         __u32                     magic;
2432         int                       fl            = 0;
2433         int                       rc            = 0;
2434         int                       rc1;
2435         int                       i;
2436         __u16                     count;
2437         bool                      locked        = false;
2438         ENTRY;
2439
2440         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2441                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2442                               LCK_EX);
2443         if (rc != 0) {
2444                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2445                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2446                        "stripe-index %u: rc = %d\n",
2447                        lfsck_lfsck2name(lfsck), PFID(cfid),
2448                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2449
2450                 RETURN(rc);
2451         }
2452
2453 again:
2454         if (locked) {
2455                 dt_write_unlock(env, parent);
2456                 locked = false;
2457         }
2458
2459         if (handle != NULL) {
2460                 dt_trans_stop(env, dt, handle);
2461                 handle = NULL;
2462         }
2463
2464         if (rc < 0)
2465                 GOTO(unlock_layout, rc);
2466
2467         lovea_size = rc;
2468         if (buf->lb_len < lovea_size) {
2469                 lu_buf_realloc(buf, lovea_size);
2470                 if (buf->lb_buf == NULL)
2471                         GOTO(unlock_layout, rc = -ENOMEM);
2472         }
2473
2474         if (!(bk->lb_param & LPF_DRYRUN)) {
2475                 handle = dt_trans_create(env, dt);
2476                 if (IS_ERR(handle))
2477                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2478
2479                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2480                                           fl, handle);
2481                 if (rc != 0)
2482                         GOTO(stop, rc);
2483
2484                 rc = dt_trans_start_local(env, dt, handle);
2485                 if (rc != 0)
2486                         GOTO(stop, rc);
2487         }
2488
2489         dt_write_lock(env, parent, 0);
2490         locked = true;
2491         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2492         if (rc == -ERANGE) {
2493                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2494                                   BYPASS_CAPA);
2495                 LASSERT(rc != 0);
2496                 goto again;
2497         } else if (rc == -ENODATA || rc == 0) {
2498                 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2499                 /* If the declared is not big enough, re-try. */
2500                 if (buf->lb_len < lovea_size) {
2501                         rc = lovea_size;
2502                         goto again;
2503                 }
2504                 fl = LU_XATTR_CREATE;
2505         } else if (rc < 0) {
2506                 GOTO(unlock_parent, rc);
2507         } else if (unlikely(buf->lb_len == 0)) {
2508                 goto again;
2509         } else {
2510                 fl = LU_XATTR_REPLACE;
2511                 lovea_size = rc;
2512         }
2513
2514         if (fl == LU_XATTR_CREATE) {
2515                 if (bk->lb_param & LPF_DRYRUN)
2516                         GOTO(unlock_parent, rc = 1);
2517
2518                 LASSERT(buf->lb_len >= lovea_size);
2519
2520                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2521                                                buf, fl, ost_idx, ea_off, false);
2522
2523                 GOTO(unlock_parent, rc);
2524         }
2525
2526         lmm = buf->lb_buf;
2527         rc1 = lfsck_layout_verify_header(lmm);
2528
2529         /* If the LOV EA crashed, the rebuild it. */
2530         if (rc1 == -EINVAL) {
2531                 if (bk->lb_param & LPF_DRYRUN)
2532                         GOTO(unlock_parent, rc = 1);
2533
2534                 LASSERT(buf->lb_len >= lovea_size);
2535
2536                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2537                                                buf, fl, ost_idx, ea_off, true);
2538
2539                 GOTO(unlock_parent, rc);
2540         }
2541
2542         /* For other unknown magic/pattern, keep the current LOV EA. */
2543         if (rc1 != 0)
2544                 GOTO(unlock_parent, rc = rc1);
2545
2546         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2547          * been verified in lfsck_layout_verify_header() already. If some
2548          * new magic introduced in the future, then layout LFSCK needs to
2549          * be updated also. */
2550         magic = le32_to_cpu(lmm->lmm_magic);
2551         if (magic == LOV_MAGIC_V1) {
2552                 objs = &lmm->lmm_objects[0];
2553         } else {
2554                 LASSERT(magic == LOV_MAGIC_V3);
2555                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2556         }
2557
2558         count = le16_to_cpu(lmm->lmm_stripe_count);
2559         if (count == 0)
2560                 GOTO(unlock_parent, rc = -EINVAL);
2561         LASSERT(count > 0);
2562
2563         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2564         if (count <= ea_off) {
2565                 if (bk->lb_param & LPF_DRYRUN)
2566                         GOTO(unlock_parent, rc = 1);
2567
2568                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2569                 /* If the declared is not big enough, re-try. */
2570                 if (buf->lb_len < lovea_size) {
2571                         rc = lovea_size;
2572                         goto again;
2573                 }
2574
2575                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2576                                                buf, fl, ost_idx, ea_off, false);
2577
2578                 GOTO(unlock_parent, rc);
2579         }
2580
2581         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2582
2583         for (i = 0; i < count; i++, objs++) {
2584                 /* The MDT-object was created via lfsck_layout_recover_create()
2585                  * by others before, and we fill the dummy layout EA. */
2586                 if (lovea_slot_is_dummy(objs)) {
2587                         if (i != ea_off)
2588                                 continue;
2589
2590                         if (bk->lb_param & LPF_DRYRUN)
2591                                 GOTO(unlock_parent, rc = 1);
2592
2593                         lmm->lmm_layout_gen =
2594                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2595                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2596                                                        cfid, buf, objs, fl,
2597                                                        ost_idx);
2598
2599                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2600                                "dummy layout slot for "DFID": parent "DFID
2601                                ", OST-index %u, stripe-index %u: rc = %d\n",
2602                                lfsck_lfsck2name(lfsck), PFID(cfid),
2603                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2604
2605                         GOTO(unlock_parent, rc);
2606                 }
2607
2608                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2609                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2610                 if (rc != 0) {
2611                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2612                                "invalid layout EA at the slot %d, index %u\n",
2613                                lfsck_lfsck2name(lfsck),
2614                                PFID(lfsck_dto2fid(parent)), i,
2615                                le32_to_cpu(objs->l_ost_idx));
2616
2617                         GOTO(unlock_parent, rc);
2618                 }
2619
2620                 /* It should be rare case, the slot is there, but the LFSCK
2621                  * does not handle it during the first-phase cycle scanning. */
2622                 if (unlikely(lu_fid_eq(fid, cfid))) {
2623                         if (i == ea_off) {
2624                                 GOTO(unlock_parent, rc = 0);
2625                         } else {
2626                                 /* Rare case that the OST-object index
2627                                  * does not match the parent MDT-object
2628                                  * layout EA. We trust the later one. */
2629                                 if (bk->lb_param & LPF_DRYRUN)
2630                                         GOTO(unlock_parent, rc = 1);
2631
2632                                 dt_write_unlock(env, parent);
2633                                 if (handle != NULL)
2634                                         dt_trans_stop(env, dt, handle);
2635                                 lfsck_ibits_unlock(&lh, LCK_EX);
2636                                 rc = lfsck_layout_update_pfid(env, com, parent,
2637                                                         cfid, ltd->ltd_tgt, i);
2638
2639                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2640                                        "updated OST-object's pfid for "DFID
2641                                        ": parent "DFID", OST-index %u, "
2642                                        "stripe-index %u: rc = %d\n",
2643                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2644                                        PFID(lfsck_dto2fid(parent)),
2645                                        ltd->ltd_index, i, rc);
2646
2647                                 RETURN(rc);
2648                         }
2649                 }
2650         }
2651
2652         /* The MDT-object exists, but related layout EA slot is occupied
2653          * by others. */
2654         if (bk->lb_param & LPF_DRYRUN)
2655                 GOTO(unlock_parent, rc = 1);
2656
2657         dt_write_unlock(env, parent);
2658         if (handle != NULL)
2659                 dt_trans_stop(env, dt, handle);
2660         lfsck_ibits_unlock(&lh, LCK_EX);
2661         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2662                 objs = &lmm->lmm_objects[ea_off];
2663         else
2664                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2665         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2666                                           buf, objs, ea_off);
2667
2668         RETURN(rc);
2669
2670 unlock_parent:
2671         if (locked)
2672                 dt_write_unlock(env, parent);
2673
2674 stop:
2675         if (handle != NULL)
2676                 dt_trans_stop(env, dt, handle);
2677
2678 unlock_layout:
2679         lfsck_ibits_unlock(&lh, LCK_EX);
2680
2681         return rc;
2682 }
2683
2684 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2685                                         struct lfsck_component *com,
2686                                         struct lfsck_tgt_desc *ltd,
2687                                         struct lu_orphan_rec *rec,
2688                                         struct lu_fid *cfid)
2689 {
2690         struct lfsck_layout     *lo     = com->lc_file_ram;
2691         struct lu_fid           *pfid   = &rec->lor_fid;
2692         struct dt_object        *parent = NULL;
2693         __u32                    ea_off = pfid->f_stripe_idx;
2694         int                      rc     = 0;
2695         ENTRY;
2696
2697         if (!fid_is_sane(cfid))
2698                 GOTO(out, rc = -EINVAL);
2699
2700         if (fid_is_zero(pfid)) {
2701                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2702                                                   "", "N", ea_off);
2703                 GOTO(out, rc);
2704         }
2705
2706         pfid->f_ver = 0;
2707         if (!fid_is_sane(pfid))
2708                 GOTO(out, rc = -EINVAL);
2709
2710         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2711         if (IS_ERR(parent))
2712                 GOTO(out, rc = PTR_ERR(parent));
2713
2714         if (unlikely(dt_object_remote(parent) != 0))
2715                 GOTO(put, rc = -EXDEV);
2716
2717         if (dt_object_exists(parent) == 0) {
2718                 lu_object_put(env, &parent->do_lu);
2719                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2720                                                   "", "R", ea_off);
2721                 GOTO(out, rc);
2722         }
2723
2724         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2725                 GOTO(put, rc = -EISDIR);
2726
2727         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2728                                          ltd->ltd_index, ea_off);
2729
2730         GOTO(put, rc);
2731
2732 put:
2733         if (rc <= 0)
2734                 lu_object_put(env, &parent->do_lu);
2735         else
2736                 /* The layout EA is changed, need to be reloaded next time. */
2737                 lu_object_put_nocache(env, &parent->do_lu);
2738
2739 out:
2740         down_write(&com->lc_sem);
2741         com->lc_new_scanned++;
2742         com->lc_new_checked++;
2743         if (rc > 0) {
2744                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2745                 rc = 0;
2746         } else if (rc < 0) {
2747                 lo->ll_objs_failed_phase2++;
2748         }
2749         up_write(&com->lc_sem);
2750
2751         return rc;
2752 }
2753
2754 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2755                                     struct lfsck_component *com,
2756                                     struct lfsck_tgt_desc *ltd)
2757 {
2758         struct lfsck_layout             *lo     = com->lc_file_ram;
2759         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2760         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2761         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2762         struct ost_id                   *oi     = &info->lti_oi;
2763         struct lu_fid                   *fid    = &info->lti_fid;
2764         struct dt_object                *obj;
2765         const struct dt_it_ops          *iops;
2766         struct dt_it                    *di;
2767         int                              rc     = 0;
2768         ENTRY;
2769
2770         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2771                "scanning for OST%04x\n",
2772                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2773
2774         ostid_set_seq(oi, FID_SEQ_IDIF);
2775         ostid_set_id(oi, 0);
2776         rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2777         if (rc != 0)
2778                 GOTO(log, rc);
2779
2780         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2781         if (unlikely(IS_ERR(obj)))
2782                 GOTO(log, rc = PTR_ERR(obj));
2783
2784         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2785         if (rc != 0)
2786                 GOTO(put, rc);
2787
2788         iops = &obj->do_index_ops->dio_it;
2789         di = iops->init(env, obj, 0, BYPASS_CAPA);
2790         if (IS_ERR(di))
2791                 GOTO(put, rc = PTR_ERR(di));
2792
2793         rc = iops->load(env, di, 0);
2794         if (rc == -ESRCH) {
2795                 /* -ESRCH means that the orphan OST-objects rbtree has been
2796                  * cleanup because of the OSS server restart or other errors. */
2797                 lo->ll_flags |= LF_INCOMPLETE;
2798                 GOTO(fini, rc);
2799         }
2800
2801         if (rc == 0)
2802                 rc = iops->next(env, di);
2803         else if (rc > 0)
2804                 rc = 0;
2805
2806         if (rc < 0)
2807                 GOTO(fini, rc);
2808
2809         if (rc > 0)
2810                 GOTO(fini, rc = 0);
2811
2812         do {
2813                 struct dt_key           *key;
2814                 struct lu_orphan_rec    *rec = &info->lti_rec;
2815
2816                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2817                     cfs_fail_val > 0) {
2818                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2819                         struct l_wait_info       lwi;
2820
2821                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2822                                           NULL, NULL);
2823                         l_wait_event(thread->t_ctl_waitq,
2824                                      !thread_is_running(thread),
2825                                      &lwi);
2826                 }
2827
2828                 key = iops->key(env, di);
2829                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2830                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2831                 if (rc == 0)
2832                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2833                                         &com->lc_fid_latest_scanned_phase2);
2834                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2835                         GOTO(fini, rc);
2836
2837                 lfsck_control_speed_by_self(com);
2838                 do {
2839                         rc = iops->next(env, di);
2840                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2841         } while (rc == 0);
2842
2843         GOTO(fini, rc);
2844
2845 fini:
2846         iops->put(env, di);
2847         iops->fini(env, di);
2848 put:
2849         lu_object_put(env, &obj->do_lu);
2850
2851 log:
2852         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2853                "scanning for OST%04x: rc = %d\n",
2854                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2855
2856         return rc > 0 ? 0 : rc;
2857 }
2858
2859 /* For the MDT-object with dangling reference, we need to repare the
2860  * inconsistency according to the LFSCK sponsor's requirement:
2861  *
2862  * 1) Keep the inconsistency there and report the inconsistency case,
2863  *    then give the chance to the application to find related issues,
2864  *    and the users can make the decision about how to handle it with
2865  *    more human knownledge. (by default)
2866  *
2867  * 2) Re-create the missed OST-object with the FID/owner information. */
2868 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2869                                         struct lfsck_component *com,
2870                                         struct lfsck_layout_req *llr,
2871                                         const struct lu_attr *pla)
2872 {
2873         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2874         struct filter_fid               *pfid   = &info->lti_new_pfid;
2875         struct dt_allocation_hint       *hint   = &info->lti_hint;
2876         struct lu_attr                  *cla    = &info->lti_la2;
2877         struct dt_object                *parent = llr->llr_parent->llo_obj;
2878         struct dt_object                *child  = llr->llr_child;
2879         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2880         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2881         struct thandle                  *handle;
2882         struct lu_buf                   *buf;
2883         struct lustre_handle             lh     = { 0 };
2884         int                              rc;
2885         bool                             create;
2886         ENTRY;
2887
2888         if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2889                 create = true;
2890         else
2891                 create = false;
2892
2893         if (!create)
2894                 GOTO(log, rc = 1);
2895
2896         memset(cla, 0, sizeof(*cla));
2897         cla->la_uid = pla->la_uid;
2898         cla->la_gid = pla->la_gid;
2899         cla->la_mode = S_IFREG | 0666;
2900         cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2901                         LA_ATIME | LA_MTIME | LA_CTIME;
2902
2903         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2904                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2905                               LCK_EX);
2906         if (rc != 0)
2907                 GOTO(log, rc);
2908
2909         handle = dt_trans_create(env, dev);
2910         if (IS_ERR(handle))
2911                 GOTO(unlock1, rc = PTR_ERR(handle));
2912
2913         hint->dah_parent = NULL;
2914         hint->dah_mode = 0;
2915         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2916         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2917         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2918          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2919          * parent MDT-object's layout EA. */
2920         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2921         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2922
2923         rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2924         if (rc != 0)
2925                 GOTO(stop, rc);
2926
2927         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2928                                   LU_XATTR_CREATE, handle);
2929         if (rc != 0)
2930                 GOTO(stop, rc);
2931
2932         rc = dt_trans_start(env, dev, handle);
2933         if (rc != 0)
2934                 GOTO(stop, rc);
2935
2936         dt_read_lock(env, parent, 0);
2937         if (unlikely(lfsck_is_dead_obj(parent)))
2938                 GOTO(unlock2, rc = 1);
2939
2940         rc = dt_create(env, child, cla, hint, NULL, handle);
2941         if (rc != 0)
2942                 GOTO(unlock2, rc);
2943
2944         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2945                           handle, BYPASS_CAPA);
2946
2947         GOTO(unlock2, rc);
2948
2949 unlock2:
2950         dt_read_unlock(env, parent);
2951
2952 stop:
2953         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2954
2955 unlock1:
2956         lfsck_ibits_unlock(&lh, LCK_EX);
2957
2958 log:
2959         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2960                "reference for: parent "DFID", child "DFID", OST-index %u, "
2961                "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2962                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2963                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
2964                llr->llr_lov_idx, pla->la_uid, pla->la_gid,
2965                create ? "Create the lost OST-object as required" :
2966                         "Keep the MDT-object there by default", rc);
2967
2968         return rc;
2969 }
2970
2971 /* If the OST-object does not recognize the MDT-object as its parent, and
2972  * there is no other MDT-object claims as its parent, then just trust the
2973  * given MDT-object as its parent. So update the OST-object filter_fid. */
2974 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2975                                               struct lfsck_component *com,
2976                                               struct lfsck_layout_req *llr,
2977                                               const struct lu_attr *pla)
2978 {
2979         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2980         struct filter_fid               *pfid   = &info->lti_new_pfid;
2981         struct lu_attr                  *tla    = &info->lti_la3;
2982         struct dt_object                *parent = llr->llr_parent->llo_obj;
2983         struct dt_object                *child  = llr->llr_child;
2984         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2985         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2986         struct thandle                  *handle;
2987         struct lu_buf                   *buf;
2988         struct lustre_handle             lh     = { 0 };
2989         int                              rc;
2990         ENTRY;
2991
2992         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2993                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2994                               LCK_EX);
2995         if (rc != 0)
2996                 GOTO(log, rc);
2997
2998         handle = dt_trans_create(env, dev);
2999         if (IS_ERR(handle))
3000                 GOTO(unlock1, rc = PTR_ERR(handle));
3001
3002         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
3003         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
3004         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
3005          * MDT-object's FID::f_ver, instead it is the OST-object index in its
3006          * parent MDT-object's layout EA. */
3007         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
3008         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
3009
3010         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
3011         if (rc != 0)
3012                 GOTO(stop, rc);
3013
3014         tla->la_valid = LA_UID | LA_GID;
3015         tla->la_uid = pla->la_uid;
3016         tla->la_gid = pla->la_gid;
3017         rc = dt_declare_attr_set(env, child, tla, handle);
3018         if (rc != 0)
3019                 GOTO(stop, rc);
3020
3021         rc = dt_trans_start(env, dev, handle);
3022         if (rc != 0)
3023                 GOTO(stop, rc);
3024
3025         dt_write_lock(env, parent, 0);
3026         if (unlikely(lfsck_is_dead_obj(parent)))
3027                 GOTO(unlock2, rc = 1);
3028
3029         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
3030                           BYPASS_CAPA);
3031         if (rc != 0)
3032                 GOTO(unlock2, rc);
3033
3034         /* Get the latest parent's owner. */
3035         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3036         if (rc != 0)
3037                 GOTO(unlock2, rc);
3038
3039         tla->la_valid = LA_UID | LA_GID;
3040         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3041
3042         GOTO(unlock2, rc);
3043
3044 unlock2:
3045         dt_write_unlock(env, parent);
3046
3047 stop:
3048         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3049
3050 unlock1:
3051         lfsck_ibits_unlock(&lh, LCK_EX);
3052
3053 log:
3054         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
3055                "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
3056                "stripe-index %u, owner %u/%u: rc = %d\n",
3057                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3058                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3059                pla->la_uid, pla->la_gid, rc);
3060
3061         return rc;
3062 }
3063
3064 /* If there are more than one MDT-objects claim as the OST-object's parent,
3065  * and the OST-object only recognizes one of them, then we need to generate
3066  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
3067 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
3068                                                    struct lfsck_component *com,
3069                                                    struct lfsck_layout_req *llr,
3070                                                    struct lu_attr *la,
3071                                                    struct lu_buf *buf)
3072 {
3073         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3074         struct dt_allocation_hint       *hint   = &info->lti_hint;
3075         struct dt_object_format         *dof    = &info->lti_dof;
3076         struct dt_device                *pdev   = com->lc_lfsck->li_next;
3077         struct ost_id                   *oi     = &info->lti_oi;
3078         struct dt_object                *parent = llr->llr_parent->llo_obj;
3079         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
3080         struct dt_object                *child  = NULL;
3081         struct lu_device                *d      = &cdev->dd_lu_dev;
3082         struct lu_object                *o      = NULL;
3083         struct thandle                  *handle;
3084         struct lov_mds_md_v1            *lmm;
3085         struct lov_ost_data_v1          *objs;
3086         struct lustre_handle             lh     = { 0 };
3087         struct lu_buf                    ea_buf;
3088         __u32                            magic;
3089         int                              rc;
3090         ENTRY;
3091
3092         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
3093                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
3094                               LCK_EX);
3095         if (rc != 0)
3096                 GOTO(log, rc);
3097
3098         handle = dt_trans_create(env, pdev);
3099         if (IS_ERR(handle))
3100                 GOTO(unlock1, rc = PTR_ERR(handle));
3101
3102         o = lu_object_anon(env, d, NULL);
3103         if (IS_ERR(o))
3104                 GOTO(stop, rc = PTR_ERR(o));
3105
3106         child = container_of(o, struct dt_object, do_lu);
3107         o = lu_object_locate(o->lo_header, d->ld_type);
3108         if (unlikely(o == NULL))
3109                 GOTO(stop, rc = -EINVAL);
3110
3111         child = container_of(o, struct dt_object, do_lu);
3112         la->la_valid = LA_UID | LA_GID;
3113         hint->dah_parent = NULL;
3114         hint->dah_mode = 0;
3115         dof->dof_type = DFT_REGULAR;
3116         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
3117         if (rc != 0)
3118                 GOTO(stop, rc);
3119
3120         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
3121                                   LU_XATTR_REPLACE, handle);
3122         if (rc != 0)
3123                 GOTO(stop, rc);
3124
3125         rc = dt_trans_start(env, pdev, handle);
3126         if (rc != 0)
3127                 GOTO(stop, rc);
3128
3129         dt_write_lock(env, parent, 0);
3130         if (unlikely(lfsck_is_dead_obj(parent)))
3131                 GOTO(unlock2, rc = 0);
3132
3133         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
3134         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
3135                 GOTO(unlock2, rc = 0);
3136
3137         lmm = buf->lb_buf;
3138         /* Someone change layout during the LFSCK, no need to repair then. */
3139         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
3140                 GOTO(unlock2, rc = 0);
3141
3142         rc = dt_create(env, child, la, hint, dof, handle);
3143         if (rc != 0)
3144                 GOTO(unlock2, rc);
3145
3146         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3147          * been verified in lfsck_layout_verify_header() already. If some
3148          * new magic introduced in the future, then layout LFSCK needs to
3149          * be updated also. */
3150         magic = le32_to_cpu(lmm->lmm_magic);
3151         if (magic == LOV_MAGIC_V1) {
3152                 objs = &lmm->lmm_objects[0];
3153         } else {
3154                 LASSERT(magic == LOV_MAGIC_V3);
3155                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3156         }
3157
3158         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
3159         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
3160         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
3161         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
3162         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
3163         lfsck_buf_init(&ea_buf, lmm,
3164                        lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
3165                                        magic));
3166         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
3167                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
3168
3169         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
3170
3171 unlock2:
3172         dt_write_unlock(env, parent);
3173
3174 stop:
3175         if (child != NULL)
3176                 lu_object_put(env, &child->do_lu);
3177
3178         dt_trans_stop(env, pdev, handle);
3179
3180 unlock1:
3181         lfsck_ibits_unlock(&lh, LCK_EX);
3182
3183 log:
3184         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
3185                "references for: parent "DFID", OST-index %u, stripe-index %u, "
3186                "owner %u/%u: rc = %d\n",
3187                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3188                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
3189
3190         return rc;
3191 }
3192
3193 /* If the MDT-object and the OST-object have different owner information,
3194  * then trust the MDT-object, because the normal chown/chgrp handle order
3195  * is from MDT to OST, and it is possible that some chown/chgrp operation
3196  * is partly done. */
3197 static int lfsck_layout_repair_owner(const struct lu_env *env,
3198                                      struct lfsck_component *com,
3199                                      struct lfsck_layout_req *llr,
3200                                      struct lu_attr *pla)
3201 {
3202         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3203         struct lu_attr                  *tla    = &info->lti_la3;
3204         struct dt_object                *parent = llr->llr_parent->llo_obj;
3205         struct dt_object                *child  = llr->llr_child;
3206         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3207         struct thandle                  *handle;
3208         int                              rc;
3209         ENTRY;
3210
3211         handle = dt_trans_create(env, dev);
3212         if (IS_ERR(handle))
3213                 GOTO(log, rc = PTR_ERR(handle));
3214
3215         tla->la_uid = pla->la_uid;
3216         tla->la_gid = pla->la_gid;
3217         tla->la_valid = LA_UID | LA_GID;
3218         rc = dt_declare_attr_set(env, child, tla, handle);
3219         if (rc != 0)
3220                 GOTO(stop, rc);
3221
3222         rc = dt_trans_start(env, dev, handle);
3223         if (rc != 0)
3224                 GOTO(stop, rc);
3225
3226         /* Use the dt_object lock to serialize with destroy and attr_set. */
3227         dt_read_lock(env, parent, 0);
3228         if (unlikely(lfsck_is_dead_obj(parent)))
3229                 GOTO(unlock, rc = 1);
3230
3231         /* Get the latest parent's owner. */
3232         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3233         if (rc != 0)
3234                 GOTO(unlock, rc);
3235
3236         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3237         if (unlikely(tla->la_uid != pla->la_uid ||
3238                      tla->la_gid != pla->la_gid))
3239                 GOTO(unlock, rc = 1);
3240
3241         tla->la_valid = LA_UID | LA_GID;
3242         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3243
3244         GOTO(unlock, rc);
3245
3246 unlock:
3247         dt_read_unlock(env, parent);
3248
3249 stop:
3250         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3251
3252 log:
3253         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
3254                "file owner for: parent "DFID", child "DFID", OST-index %u, "
3255                "stripe-index %u, owner %u/%u: rc = %d\n",
3256                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3257                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3258                pla->la_uid, pla->la_gid, rc);
3259
3260         return rc;
3261 }
3262
3263 /* Check whether the OST-object correctly back points to the
3264  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3265 static int lfsck_layout_check_parent(const struct lu_env *env,
3266                                      struct lfsck_component *com,
3267                                      struct dt_object *parent,
3268                                      const struct lu_fid *pfid,
3269                                      const struct lu_fid *cfid,
3270                                      const struct lu_attr *pla,
3271                                      const struct lu_attr *cla,
3272                                      struct lfsck_layout_req *llr,
3273                                      struct lu_buf *lov_ea, __u32 idx)
3274 {
3275         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3276         struct lu_buf                   *buf    = &info->lti_big_buf;
3277         struct dt_object                *tobj;
3278         struct lov_mds_md_v1            *lmm;
3279         struct lov_ost_data_v1          *objs;
3280         int                              rc;
3281         int                              i;
3282         __u32                            magic;
3283         __u16                            count;
3284         ENTRY;
3285
3286         if (fid_is_zero(pfid)) {
3287                 /* client never wrote. */
3288                 if (cla->la_size == 0 && cla->la_blocks == 0) {
3289                         if (unlikely(cla->la_uid != pla->la_uid ||
3290                                      cla->la_gid != pla->la_gid))
3291                                 RETURN (LLIT_INCONSISTENT_OWNER);
3292
3293                         RETURN(0);
3294                 }
3295
3296                 RETURN(LLIT_UNMATCHED_PAIR);
3297         }
3298
3299         if (unlikely(!fid_is_sane(pfid)))
3300                 RETURN(LLIT_UNMATCHED_PAIR);
3301
3302         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3303                 if (llr->llr_lov_idx == idx)
3304                         RETURN(0);
3305
3306                 RETURN(LLIT_UNMATCHED_PAIR);
3307         }
3308
3309         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3310         if (IS_ERR(tobj))
3311                 RETURN(PTR_ERR(tobj));
3312
3313         dt_read_lock(env, tobj, 0);
3314         if (dt_object_exists(tobj) == 0 ||
3315             lfsck_is_dead_obj(tobj))
3316                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3317
3318         if (!S_ISREG(lfsck_object_type(tobj)))
3319                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3320
3321         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3322          * remote one on another MDT. Then check whether the given OST-object
3323          * is in such layout. If yes, it is multiple referenced, otherwise it
3324          * is unmatched referenced case. */
3325         rc = lfsck_layout_get_lovea(env, tobj, buf);
3326         if (rc == 0 || rc == -ENOENT)
3327                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3328
3329         if (rc < 0)
3330                 GOTO(out, rc);
3331
3332         lmm = buf->lb_buf;
3333         magic = le32_to_cpu(lmm->lmm_magic);
3334         if (magic == LOV_MAGIC_V1) {
3335                 objs = &lmm->lmm_objects[0];
3336         } else {
3337                 LASSERT(magic == LOV_MAGIC_V3);
3338                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3339         }
3340
3341         count = le16_to_cpu(lmm->lmm_stripe_count);
3342         for (i = 0; i < count; i++, objs++) {
3343                 struct lu_fid           *tfid   = &info->lti_fid2;
3344                 struct ost_id           *oi     = &info->lti_oi;
3345                 __u32                    idx2;
3346
3347                 if (lovea_slot_is_dummy(objs))
3348                         continue;
3349
3350                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3351                 idx2 = le32_to_cpu(objs->l_ost_idx);
3352                 rc = ostid_to_fid(tfid, oi, idx2);
3353                 if (rc != 0) {
3354                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
3355                                "invalid layout EA at the slot %d, index %u\n",
3356                                lfsck_lfsck2name(com->lc_lfsck),
3357                                PFID(pfid), i, idx2);
3358
3359                         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3360                 }
3361
3362                 if (lu_fid_eq(cfid, tfid)) {
3363                         *lov_ea = *buf;
3364
3365                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
3366                 }
3367         }
3368
3369         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3370
3371 out:
3372         dt_read_unlock(env, tobj);
3373         lfsck_object_put(env, tobj);
3374
3375         return rc;
3376 }
3377
3378 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
3379                                              struct lfsck_component *com,
3380                                              struct lfsck_layout_req *llr)
3381 {
3382         struct lfsck_layout                  *lo     = com->lc_file_ram;
3383         struct lfsck_thread_info             *info   = lfsck_env_info(env);
3384         struct filter_fid_old                *pea    = &info->lti_old_pfid;
3385         struct lu_fid                        *pfid   = &info->lti_fid;
3386         struct lu_buf                         buf    = { 0 };
3387         struct dt_object                     *parent = llr->llr_parent->llo_obj;
3388         struct dt_object                     *child  = llr->llr_child;
3389         struct lu_attr                       *pla    = &info->lti_la;
3390         struct lu_attr                       *cla    = &info->lti_la2;
3391         struct lfsck_instance                *lfsck  = com->lc_lfsck;
3392         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
3393         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
3394         __u32                                 idx    = 0;
3395         int                                   rc;
3396         ENTRY;
3397
3398         if (unlikely(lfsck_is_dead_obj(parent)))
3399                 RETURN(0);
3400
3401         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3402         if (rc != 0)
3403                 GOTO(out, rc);
3404
3405         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3406         if (rc == -ENOENT) {
3407                 if (unlikely(lfsck_is_dead_obj(parent)))
3408                         RETURN(0);
3409
3410                 type = LLIT_DANGLING;
3411                 goto repair;
3412         }
3413
3414         if (rc != 0)
3415                 GOTO(out, rc);
3416
3417         lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
3418         rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
3419         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3420                      rc != sizeof(struct filter_fid))) {
3421                 type = LLIT_UNMATCHED_PAIR;
3422                 goto repair;
3423         }
3424
3425         if (rc < 0 && rc != -ENODATA)
3426                 GOTO(out, rc);
3427
3428         if (rc == -ENODATA) {
3429                 fid_zero(pfid);
3430         } else {
3431                 fid_le_to_cpu(pfid, &pea->ff_parent);
3432                 /* Currently, the filter_fid::ff_parent::f_ver is not the
3433                  * real parent MDT-object's FID::f_ver, instead it is the
3434                  * OST-object index in its parent MDT-object's layout EA. */
3435                 idx = pfid->f_stripe_idx;
3436                 pfid->f_ver = 0;
3437         }
3438
3439         rc = lfsck_layout_check_parent(env, com, parent, pfid,
3440                                        lu_object_fid(&child->do_lu),
3441                                        pla, cla, llr, &buf, idx);
3442         if (rc > 0) {
3443                 type = rc;
3444                 goto repair;
3445         }
3446
3447         if (rc < 0)
3448                 GOTO(out, rc);
3449
3450         if (unlikely(cla->la_uid != pla->la_uid ||
3451                      cla->la_gid != pla->la_gid)) {
3452                 type = LLIT_INCONSISTENT_OWNER;
3453                 goto repair;
3454         }
3455
3456 repair:
3457         if (bk->lb_param & LPF_DRYRUN) {
3458                 if (type != LLIT_NONE)
3459                         GOTO(out, rc = 1);
3460                 else
3461                         GOTO(out, rc = 0);
3462         }
3463
3464         switch (type) {
3465         case LLIT_DANGLING:
3466                 rc = lfsck_layout_repair_dangling(env, com, llr, pla);
3467                 break;
3468         case LLIT_UNMATCHED_PAIR:
3469                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3470                 break;
3471         case LLIT_MULTIPLE_REFERENCED:
3472                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3473                                                              pla, &buf);
3474                 break;
3475         case LLIT_INCONSISTENT_OWNER:
3476                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3477                 break;
3478         default:
3479                 rc = 0;
3480                 break;
3481         }
3482
3483         GOTO(out, rc);
3484
3485 out:
3486         down_write(&com->lc_sem);
3487         if (rc < 0) {
3488                 struct lfsck_layout_master_data *llmd = com->lc_data;
3489
3490                 if (unlikely(llmd->llmd_exit)) {
3491                         rc = 0;
3492                 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3493                            rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3494                            rc == -EHOSTUNREACH) {
3495                         /* If cannot touch the target server,
3496                          * mark the LFSCK as INCOMPLETE. */
3497                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3498                                "talk with OST %x: rc = %d\n",
3499                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3500                         lo->ll_flags |= LF_INCOMPLETE;
3501                         lo->ll_objs_skipped++;
3502                         rc = 0;
3503                 } else {
3504                         lfsck_layout_record_failure(env, lfsck, lo);
3505                 }
3506         } else if (rc > 0) {
3507                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3508                          "unknown type = %d\n", type);
3509
3510                 lo->ll_objs_repaired[type - 1]++;
3511                 if (bk->lb_param & LPF_DRYRUN &&
3512                     unlikely(lo->ll_pos_first_inconsistent == 0))
3513                         lo->ll_pos_first_inconsistent =
3514                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3515                                                         lfsck->li_di_oit);
3516         }
3517         up_write(&com->lc_sem);
3518
3519         return rc;
3520 }
3521
3522 static int lfsck_layout_assistant(void *args)
3523 {
3524         struct lfsck_thread_args        *lta     = args;
3525         struct lu_env                   *env     = &lta->lta_env;
3526         struct lfsck_component          *com     = lta->lta_com;
3527         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
3528         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3529         struct lfsck_position           *pos     = &com->lc_pos_start;
3530         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3531         struct lfsck_request            *lr      = &info->lti_lr;
3532         struct lfsck_layout_master_data *llmd    = com->lc_data;
3533         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3534         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3535         struct lfsck_layout_req         *llr;
3536         struct l_wait_info               lwi     = { 0 };
3537         int                              rc      = 0;
3538         int                              rc1     = 0;
3539         ENTRY;
3540
3541         memset(lr, 0, sizeof(*lr));
3542         lr->lr_event = LE_START;
3543         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3544                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
3545         lr->lr_speed = bk->lb_speed_limit;
3546         lr->lr_version = bk->lb_version;
3547         lr->lr_param = bk->lb_param;
3548         lr->lr_async_windows = bk->lb_async_windows;
3549         lr->lr_flags = LEF_TO_OST;
3550         if (pos->lp_oit_cookie <= 1)
3551                 lr->lr_param |= LPF_RESET;
3552
3553         rc = lfsck_layout_master_notify_others(env, com, lr);
3554         if (rc != 0) {
3555                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to notify "
3556                        "others for LFSCK start: rc = %d\n",
3557                        lfsck_lfsck2name(lfsck), rc);
3558                 GOTO(fini, rc);
3559         }
3560
3561         spin_lock(&llmd->llmd_lock);
3562         thread_set_flags(athread, SVC_RUNNING);
3563         spin_unlock(&llmd->llmd_lock);
3564         wake_up_all(&mthread->t_ctl_waitq);
3565
3566         while (1) {
3567                 while (!list_empty(&llmd->llmd_req_list)) {
3568                         bool wakeup = false;
3569
3570                         if (unlikely(llmd->llmd_exit ||
3571                                      !thread_is_running(mthread)))
3572                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
3573
3574                         llr = list_entry(llmd->llmd_req_list.next,
3575                                          struct lfsck_layout_req,
3576                                          llr_list);
3577                         /* Only the lfsck_layout_assistant thread itself can
3578                          * remove the "llr" from the head of the list, LFSCK
3579                          * engine thread only inserts other new "lld" at the
3580                          * end of the list. So it is safe to handle current
3581                          * "llr" without the spin_lock. */
3582                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
3583                         spin_lock(&llmd->llmd_lock);
3584                         list_del_init(&llr->llr_list);
3585                         llmd->llmd_prefetched--;
3586                         /* Wake up the main engine thread only when the list
3587                          * is empty or half of the prefetched items have been
3588                          * handled to avoid too frequent thread schedule. */
3589                         if (llmd->llmd_prefetched == 0 ||
3590                             (bk->lb_async_windows != 0 &&
3591                              bk->lb_async_windows / 2 ==
3592                              llmd->llmd_prefetched))
3593                                 wakeup = true;
3594                         spin_unlock(&llmd->llmd_lock);
3595                         if (wakeup)
3596                                 wake_up_all(&mthread->t_ctl_waitq);
3597
3598                         lfsck_layout_req_fini(env, llr);
3599                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3600                                 GOTO(cleanup1, rc);
3601                 }
3602
3603                 l_wait_event(athread->t_ctl_waitq,
3604                              !lfsck_layout_req_empty(llmd) ||
3605                              llmd->llmd_exit ||
3606                              llmd->llmd_to_post ||
3607                              llmd->llmd_to_double_scan,
3608                              &lwi);
3609
3610                 if (unlikely(llmd->llmd_exit))
3611                         GOTO(cleanup1, rc = llmd->llmd_post_result);
3612
3613                 if (!list_empty(&llmd->llmd_req_list))
3614                         continue;
3615
3616                 if (llmd->llmd_to_post) {
3617                         llmd->llmd_to_post = 0;
3618                         LASSERT(llmd->llmd_post_result > 0);
3619
3620                         memset(lr, 0, sizeof(*lr));
3621                         lr->lr_event = LE_PHASE1_DONE;
3622                         lr->lr_status = llmd->llmd_post_result;
3623                         rc = lfsck_layout_master_notify_others(env, com, lr);
3624                         if (rc != 0)
3625                                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant "
3626                                        "failed to notify others for LFSCK "
3627                                        "post: rc = %d\n",
3628                                        lfsck_lfsck2name(lfsck), rc);
3629
3630                         /* Wakeup the master engine to go ahead. */
3631                         wake_up_all(&mthread->t_ctl_waitq);
3632                 }
3633
3634                 if (llmd->llmd_to_double_scan) {
3635                         llmd->llmd_to_double_scan = 0;
3636                         atomic_inc(&lfsck->li_double_scan_count);
3637                         llmd->llmd_in_double_scan = 1;
3638                         wake_up_all(&mthread->t_ctl_waitq);
3639
3640                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 "
3641                                "scan start\n", lfsck_lfsck2name(lfsck));
3642
3643                         com->lc_new_checked = 0;
3644                         com->lc_new_scanned = 0;
3645                         com->lc_time_last_checkpoint = cfs_time_current();
3646                         com->lc_time_next_checkpoint =
3647                                 com->lc_time_last_checkpoint +
3648                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3649
3650                         /* flush all async updating before handling orphan. */
3651                         dt_sync(env, lfsck->li_next);
3652
3653                         while (llmd->llmd_in_double_scan) {
3654                                 struct lfsck_tgt_descs  *ltds =
3655                                                         &lfsck->li_ost_descs;
3656                                 struct lfsck_tgt_desc   *ltd;
3657
3658                                 rc = lfsck_layout_master_query_others(env, com);
3659                                 if (lfsck_layout_master_to_orphan(llmd))
3660                                         goto orphan;
3661
3662                                 if (rc < 0)
3663                                         GOTO(cleanup2, rc);
3664
3665                                 /* Pull LFSCK status on related targets once
3666                                  * per 30 seconds if we are not notified. */
3667                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
3668                                                            cfs_time_seconds(1),
3669                                                            NULL, NULL);
3670                                 rc = l_wait_event(athread->t_ctl_waitq,
3671                                         lfsck_layout_master_to_orphan(llmd) ||
3672                                         llmd->llmd_exit ||
3673                                         !thread_is_running(mthread),
3674                                         &lwi);
3675
3676                                 if (unlikely(llmd->llmd_exit ||
3677                                              !thread_is_running(mthread)))
3678                                         GOTO(cleanup2, rc = 0);
3679
3680                                 if (rc == -ETIMEDOUT)
3681                                         continue;
3682
3683                                 if (rc < 0)
3684                                         GOTO(cleanup2, rc);
3685
3686 orphan:
3687                                 spin_lock(&ltds->ltd_lock);
3688                                 while (!list_empty(
3689                                                 &llmd->llmd_ost_phase2_list)) {
3690                                         ltd = list_entry(
3691                                               llmd->llmd_ost_phase2_list.next,
3692                                               struct lfsck_tgt_desc,
3693                                               ltd_layout_phase_list);
3694                                         list_del_init(
3695                                                 &ltd->ltd_layout_phase_list);
3696                                         spin_unlock(&ltds->ltd_lock);
3697
3698                                         if (bk->lb_param & LPF_ALL_TGT) {
3699                                                 rc = lfsck_layout_scan_orphan(
3700                                                                 env, com, ltd);
3701                                                 if (rc != 0 &&
3702                                                     bk->lb_param & LPF_FAILOUT)
3703                                                         GOTO(cleanup2, rc);
3704                                         }
3705
3706                                         if (unlikely(llmd->llmd_exit ||
3707                                                 !thread_is_running(mthread)))
3708                                                 GOTO(cleanup2, rc = 0);
3709
3710                                         spin_lock(&ltds->ltd_lock);
3711                                 }
3712
3713                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
3714                                         spin_unlock(&ltds->ltd_lock);
3715                                         GOTO(cleanup2, rc = 1);
3716                                 }
3717                                 spin_unlock(&ltds->ltd_lock);
3718                         }
3719                 }
3720         }
3721
3722 cleanup1:
3723         /* Cleanup the unfinished requests. */
3724         spin_lock(&llmd->llmd_lock);
3725         if (rc < 0)
3726                 llmd->llmd_assistant_status = rc;
3727
3728         while (!list_empty(&llmd->llmd_req_list)) {
3729                 llr = list_entry(llmd->llmd_req_list.next,
3730                                  struct lfsck_layout_req,
3731                                  llr_list);
3732                 list_del_init(&llr->llr_list);
3733                 llmd->llmd_prefetched--;
3734                 spin_unlock(&llmd->llmd_lock);
3735                 lfsck_layout_req_fini(env, llr);
3736                 spin_lock(&llmd->llmd_lock);
3737         }
3738         spin_unlock(&llmd->llmd_lock);
3739
3740         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
3741                  llmd->llmd_prefetched);
3742
3743 cleanup2:
3744         memset(lr, 0, sizeof(*lr));
3745         if (rc > 0) {
3746                 lr->lr_event = LE_PHASE2_DONE;
3747                 lr->lr_status = rc;
3748         } else if (rc == 0) {
3749                 if (lfsck->li_flags & LPF_ALL_TGT) {
3750                         lr->lr_event = LE_STOP;
3751                         lr->lr_status = LS_STOPPED;
3752                 } else {
3753                         lr->lr_event = LE_PEER_EXIT;
3754                         switch (lfsck->li_status) {
3755                         case LS_PAUSED:
3756                         case LS_CO_PAUSED:
3757                                 lr->lr_status = LS_CO_PAUSED;
3758                                 break;
3759                         case LS_STOPPED:
3760                         case LS_CO_STOPPED:
3761                                 lr->lr_status = LS_CO_STOPPED;
3762                                 break;
3763                         default:
3764                                 CDEBUG(D_LFSCK, "%s: unknown status: rc = %d\n",
3765                                        lfsck_lfsck2name(lfsck),
3766                                        lfsck->li_status);
3767                                 lr->lr_status = LS_CO_FAILED;
3768                                 break;
3769                         }
3770                 }
3771         } else {
3772                 if (lfsck->li_flags & LPF_ALL_TGT) {
3773                         lr->lr_event = LE_STOP;
3774                         lr->lr_status = LS_FAILED;
3775                 } else {
3776                         lr->lr_event = LE_PEER_EXIT;
3777                         lr->lr_status = LS_CO_FAILED;
3778                 }
3779         }
3780
3781         rc1 = lfsck_layout_master_notify_others(env, com, lr);
3782         if (rc1 != 0) {
3783                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to "
3784                        "notify others for LFSCK quit: rc = %d\n",
3785                        lfsck_lfsck2name(lfsck), rc1);
3786                 rc = rc1;
3787         }
3788
3789         /* flush all async updating before exit. */
3790         dt_sync(env, lfsck->li_next);
3791
3792         /* Under force exit case, some requests may be just freed without
3793          * verification, those objects should be re-handled when next run.
3794          * So not update the on-disk tracing file under such case. */
3795         if (llmd->llmd_in_double_scan) {
3796                 struct lfsck_layout *lo = com->lc_file_ram;
3797
3798                 if (!llmd->llmd_exit)
3799                         rc1 = lfsck_layout_double_scan_result(env, com, rc);
3800
3801                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 scan "
3802                        "finished, status %d: rc = %d\n",
3803                        lfsck_lfsck2name(lfsck), lo->ll_status, rc1);
3804         }
3805
3806 fini:
3807         if (llmd->llmd_in_double_scan)
3808                 atomic_dec(&lfsck->li_double_scan_count);
3809
3810         spin_lock(&llmd->llmd_lock);
3811         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
3812         thread_set_flags(athread, SVC_STOPPED);
3813         wake_up_all(&mthread->t_ctl_waitq);
3814         spin_unlock(&llmd->llmd_lock);
3815         lfsck_thread_args_fini(lta);
3816
3817         return rc;
3818 }
3819
3820 static int
3821 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3822                                    struct ptlrpc_request *req,
3823                                    void *args, int rc)
3824 {
3825         struct lfsck_layout_slave_async_args *llsaa = args;
3826         struct obd_export                    *exp   = llsaa->llsaa_exp;
3827         struct lfsck_component               *com   = llsaa->llsaa_com;
3828         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3829         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3830         struct lfsck_reply                   *lr    = NULL;
3831         bool                                  done  = false;
3832
3833         if (rc != 0) {
3834                 /* It is quite probably caused by target crash,
3835                  * to make the LFSCK can go ahead, assume that
3836                  * the target finished the LFSCK prcoessing. */
3837                 done = true;
3838         } else {
3839                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3840                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3841                     lr->lr_status != LS_SCANNING_PHASE2)
3842                         done = true;
3843         }
3844
3845         if (done) {
3846                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave gets the MDT %x "
3847                        "status %d\n", lfsck_lfsck2name(com->lc_lfsck),
3848                        llst->llst_index, lr != NULL ? lr->lr_status : rc);
3849
3850                 lfsck_layout_llst_del(llsd, llst);
3851         }
3852
3853         lfsck_layout_llst_put(llst);
3854         lfsck_component_put(env, com);
3855         class_export_put(exp);
3856
3857         return 0;
3858 }
3859
3860 static int lfsck_layout_async_query(const struct lu_env *env,
3861                                     struct lfsck_component *com,
3862                                     struct obd_export *exp,
3863                                     struct lfsck_layout_slave_target *llst,
3864                                     struct lfsck_request *lr,
3865                                     struct ptlrpc_request_set *set)
3866 {
3867         struct lfsck_layout_slave_async_args *llsaa;
3868         struct ptlrpc_request                *req;
3869         struct lfsck_request                 *tmp;
3870         int                                   rc;
3871         ENTRY;
3872
3873         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3874         if (req == NULL)
3875                 RETURN(-ENOMEM);
3876
3877         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3878         if (rc != 0) {
3879                 ptlrpc_request_free(req);
3880                 RETURN(rc);
3881         }
3882
3883         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3884         *tmp = *lr;
3885         ptlrpc_request_set_replen(req);
3886
3887         llsaa = ptlrpc_req_async_args(req);
3888         llsaa->llsaa_exp = exp;
3889         llsaa->llsaa_com = lfsck_component_get(com);
3890         llsaa->llsaa_llst = llst;
3891         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3892         ptlrpc_set_add_req(set, req);
3893
3894         RETURN(0);
3895 }
3896
3897 static int lfsck_layout_async_notify(const struct lu_env *env,
3898                                      struct obd_export *exp,
3899                                      struct lfsck_request *lr,
3900                                      struct ptlrpc_request_set *set)
3901 {
3902         struct ptlrpc_request   *req;
3903         struct lfsck_request    *tmp;
3904         int                      rc;
3905         ENTRY;
3906
3907         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3908         if (req == NULL)
3909                 RETURN(-ENOMEM);
3910
3911         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3912         if (rc != 0) {
3913                 ptlrpc_request_free(req);
3914                 RETURN(rc);
3915         }
3916
3917         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3918         *tmp = *lr;
3919         ptlrpc_request_set_replen(req);
3920         ptlrpc_set_add_req(set, req);
3921
3922         RETURN(0);
3923 }
3924
3925 static int
3926 lfsck_layout_slave_query_master(const struct lu_env *env,
3927                                 struct lfsck_component *com)
3928 {
3929         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3930         struct lfsck_instance            *lfsck = com->lc_lfsck;
3931         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3932         struct lfsck_layout_slave_target *llst;
3933         struct obd_export                *exp;
3934         struct ptlrpc_request_set        *set;
3935         int                               rc    = 0;
3936         int                               rc1   = 0;
3937         ENTRY;
3938
3939         set = ptlrpc_prep_set();
3940         if (set == NULL)
3941                 GOTO(log, rc = -ENOMEM);
3942
3943         memset(lr, 0, sizeof(*lr));
3944         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3945         lr->lr_event = LE_QUERY;
3946         lr->lr_active = LFSCK_TYPE_LAYOUT;
3947
3948         llsd->llsd_touch_gen++;
3949         spin_lock(&llsd->llsd_lock);
3950         while (!list_empty(&llsd->llsd_master_list)) {
3951                 llst = list_entry(llsd->llsd_master_list.next,
3952                                   struct lfsck_layout_slave_target,
3953                                   llst_list);
3954                 if (llst->llst_gen == llsd->llsd_touch_gen)
3955                         break;
3956
3957                 llst->llst_gen = llsd->llsd_touch_gen;
3958                 list_move_tail(&llst->llst_list,
3959                                &llsd->llsd_master_list);
3960                 atomic_inc(&llst->llst_ref);
3961                 spin_unlock(&llsd->llsd_lock);
3962
3963                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3964                                                llst->llst_index);
3965                 if (exp == NULL) {
3966                         lfsck_layout_llst_del(llsd, llst);
3967                         lfsck_layout_llst_put(llst);
3968                         spin_lock(&llsd->llsd_lock);
3969                         continue;
3970                 }
3971
3972                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3973                 if (rc != 0) {
3974                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3975                                "query %s for layout: rc = %d\n",
3976                                lfsck_lfsck2name(lfsck),
3977                                exp->exp_obd->obd_name, rc);
3978
3979                         rc1 = rc;
3980                         lfsck_layout_llst_put(llst);
3981                         class_export_put(exp);
3982                 }
3983                 spin_lock(&llsd->llsd_lock);
3984         }
3985         spin_unlock(&llsd->llsd_lock);
3986
3987         rc = ptlrpc_set_wait(set);
3988         ptlrpc_set_destroy(set);
3989
3990         GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
3991
3992 log:
3993         CDEBUG(D_LFSCK, "%s: layout LFSCK slave queries master: rc = %d\n",
3994                lfsck_lfsck2name(com->lc_lfsck), rc);
3995
3996         return rc;
3997 }
3998
3999 static void
4000 lfsck_layout_slave_notify_master(const struct lu_env *env,
4001                                  struct lfsck_component *com,
4002                                  enum lfsck_events event, int result)
4003 {
4004         struct lfsck_instance            *lfsck = com->lc_lfsck;
4005         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
4006         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
4007         struct lfsck_layout_slave_target *llst;
4008         struct obd_export                *exp;
4009         struct ptlrpc_request_set        *set;
4010         int                               rc;
4011         ENTRY;
4012
4013         CDEBUG(D_LFSCK, "%s: layout LFSCK slave notifies master\n",
4014                lfsck_lfsck2name(com->lc_lfsck));
4015
4016         set = ptlrpc_prep_set();
4017         if (set == NULL)
4018                 RETURN_EXIT;
4019
4020         memset(lr, 0, sizeof(*lr));
4021         lr->lr_event = event;
4022         lr->lr_flags = LEF_FROM_OST;
4023         lr->lr_status = result;
4024         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
4025         lr->lr_active = LFSCK_TYPE_LAYOUT;
4026         llsd->llsd_touch_gen++;
4027         spin_lock(&llsd->llsd_lock);
4028         while (!list_empty(&llsd->llsd_master_list)) {
4029                 llst = list_entry(llsd->llsd_master_list.next,
4030                                   struct lfsck_layout_slave_target,
4031                                   llst_list);
4032                 if (llst->llst_gen == llsd->llsd_touch_gen)
4033                         break;
4034
4035                 llst->llst_gen = llsd->llsd_touch_gen;
4036                 list_move_tail(&llst->llst_list,
4037                                &llsd->llsd_master_list);
4038                 atomic_inc(&llst->llst_ref);
4039                 spin_unlock(&llsd->llsd_lock);
4040
4041                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
4042                                                llst->llst_index);
4043                 if (exp == NULL) {
4044                         lfsck_layout_llst_del(llsd, llst);
4045                         lfsck_layout_llst_put(llst);
4046                         spin_lock(&llsd->llsd_lock);
4047                         continue;
4048                 }
4049
4050                 rc = lfsck_layout_async_notify(env, exp, lr, set);
4051                 if (rc != 0)
4052                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
4053                                "notify %s for layout: rc = %d\n",
4054                                lfsck_lfsck2name(lfsck),
4055                                exp->exp_obd->obd_name, rc);
4056
4057                 lfsck_layout_llst_put(llst);
4058                 class_export_put(exp);
4059                 spin_lock(&llsd->llsd_lock);
4060         }
4061         spin_unlock(&llsd->llsd_lock);
4062
4063         ptlrpc_set_wait(set);
4064         ptlrpc_set_destroy(set);
4065
4066         RETURN_EXIT;
4067 }
4068
4069 /*
4070  * \ret -ENODATA: unrecognized stripe
4071  * \ret = 0     : recognized stripe
4072  * \ret < 0     : other failures
4073  */
4074 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
4075                                            struct lfsck_component *com,
4076                                            struct lu_fid *cfid,
4077                                            struct lu_fid *pfid)
4078 {
4079         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4080         struct lu_buf                   *buf    = &info->lti_big_buf;
4081         struct ost_id                   *oi     = &info->lti_oi;
4082         struct dt_object                *obj;
4083         struct lov_mds_md_v1            *lmm;
4084         struct lov_ost_data_v1          *objs;
4085         __u32                            idx    = pfid->f_stripe_idx;
4086         __u32                            magic;
4087         int                              rc     = 0;
4088         int                              i;
4089         __u16                            count;
4090         ENTRY;
4091
4092         pfid->f_ver = 0;
4093         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
4094         if (IS_ERR(obj))
4095                 RETURN(PTR_ERR(obj));
4096
4097         dt_read_lock(env, obj, 0);
4098         if (unlikely(dt_object_exists(obj) == 0 ||
4099                      lfsck_is_dead_obj(obj)))
4100                 GOTO(unlock, rc = -ENOENT);
4101
4102         if (!S_ISREG(lfsck_object_type(obj)))
4103                 GOTO(unlock, rc = -ENODATA);
4104
4105         rc = lfsck_layout_get_lovea(env, obj, buf);
4106         if (rc < 0)
4107                 GOTO(unlock, rc);
4108
4109         if (rc == 0)
4110                 GOTO(unlock, rc = -ENODATA);
4111
4112         lmm = buf->lb_buf;
4113         rc = lfsck_layout_verify_header(lmm);
4114         if (rc != 0)
4115                 GOTO(unlock, rc);
4116
4117         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4118          * been verified in lfsck_layout_verify_header() already. If some
4119          * new magic introduced in the future, then layout LFSCK needs to
4120          * be updated also. */
4121         magic = le32_to_cpu(lmm->lmm_magic);
4122         if (magic == LOV_MAGIC_V1) {
4123                 objs = &lmm->lmm_objects[0];
4124         } else {
4125                 LASSERT(magic == LOV_MAGIC_V3);
4126                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4127         }
4128
4129         fid_to_ostid(cfid, oi);
4130         count = le16_to_cpu(lmm->lmm_stripe_count);
4131         for (i = 0; i < count; i++, objs++) {
4132                 struct ost_id oi2;
4133
4134                 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
4135                 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
4136                         GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
4137         }
4138
4139         GOTO(unlock, rc = -ENODATA);
4140
4141 unlock:
4142         dt_read_unlock(env, obj);
4143         lu_object_put(env, &obj->do_lu);
4144
4145         return rc;
4146 }
4147
4148 /*
4149  * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
4150  * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
4151  * layout EA from MDT to OST. On one hand, the OST no need to understand
4152  * the layout EA structure; on the other hand, it may cause trouble when
4153  * transfer large layout EA from MDT to OST via normal OUT RPC.
4154  *
4155  * \ret > 0: unrecognized stripe
4156  * \ret = 0: recognized stripe
4157  * \ret < 0: other failures
4158  */
4159 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
4160                                           struct lfsck_component *com,
4161                                           struct lu_fid *cfid,
4162                                           struct lu_fid *pfid)
4163 {
4164         struct lfsck_instance    *lfsck  = com->lc_lfsck;
4165         struct obd_device        *obd    = lfsck->li_obd;
4166         struct seq_server_site   *ss     =
4167                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
4168         struct obd_export        *exp    = NULL;
4169         struct ptlrpc_request    *req    = NULL;
4170         struct lfsck_request     *lr;
4171         struct lu_seq_range       range  = { 0 };
4172         int                       rc     = 0;
4173         ENTRY;
4174
4175         if (unlikely(fid_is_idif(pfid)))
4176                 RETURN(1);
4177
4178         fld_range_set_any(&range);
4179         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
4180         if (rc != 0)
4181                 RETURN(rc == -ENOENT ? 1 : rc);
4182
4183         if (unlikely(!fld_range_is_mdt(&range)))
4184                 RETURN(1);
4185
4186         exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
4187         if (unlikely(exp == NULL))
4188                 RETURN(1);
4189
4190         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
4191                 GOTO(out, rc = -EOPNOTSUPP);
4192
4193         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
4194         if (req == NULL)
4195                 GOTO(out, rc = -ENOMEM);
4196
4197         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
4198         if (rc != 0) {
4199                 ptlrpc_request_free(req);
4200
4201                 GOTO(out, rc);
4202         }
4203
4204         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
4205         memset(lr, 0, sizeof(*lr));
4206         lr->lr_event = LE_PAIRS_VERIFY;
4207         lr->lr_active = LFSCK_TYPE_LAYOUT;
4208         lr->lr_fid = *cfid; /* OST-object itself FID. */
4209         lr->lr_fid2 = *pfid; /* The claimed parent FID. */
4210
4211         ptlrpc_request_set_replen(req);
4212         rc = ptlrpc_queue_wait(req);
4213         ptlrpc_req_finished(req);
4214
4215         if (rc == -ENOENT || rc == -ENODATA)
4216                 rc = 1;
4217
4218         GOTO(out, rc);
4219
4220 out:
4221         if (exp != NULL)
4222                 class_export_put(exp);
4223
4224         return rc;
4225 }
4226
4227 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
4228                                           struct lfsck_component *com,
4229                                           struct lfsck_request *lr)
4230 {
4231         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4232         struct filter_fid               *ff     = &info->lti_new_pfid;
4233         struct lu_buf                   *buf;
4234         struct dt_device                *dev    = com->lc_lfsck->li_bottom;
4235         struct dt_object                *obj;
4236         struct thandle                  *th     = NULL;
4237         int                              rc     = 0;
4238         ENTRY;
4239
4240         obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
4241         if (IS_ERR(obj))
4242                 GOTO(log, rc = PTR_ERR(obj));
4243
4244         fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
4245         buf = lfsck_buf_get(env, ff, sizeof(*ff));
4246         dt_write_lock(env, obj, 0);
4247         if (unlikely(dt_object_exists(obj) == 0 ||
4248                      lfsck_is_dead_obj(obj)))
4249                 GOTO(unlock, rc = 0);
4250
4251         th = dt_trans_create(env, dev);
4252         if (IS_ERR(th))
4253                 GOTO(unlock, rc = PTR_ERR(th));
4254
4255         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
4256         if (rc != 0)
4257                 GOTO(stop, rc);
4258
4259         rc = dt_trans_start_local(env, dev, th);
4260         if (rc != 0)
4261                 GOTO(stop, rc);
4262
4263         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
4264
4265         GOTO(stop, rc);
4266
4267 stop:
4268         dt_trans_stop(env, dev, th);
4269
4270 unlock:
4271         dt_write_unlock(env, obj);
4272         lu_object_put(env, &obj->do_lu);
4273
4274 log:
4275         CDEBUG(D_LFSCK, "%s: layout LFSCK slave repaired pfid for "DFID
4276                ", parent "DFID": rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
4277                PFID(&lr->lr_fid), PFID(&lr->lr_fid2), rc);
4278
4279         return rc;
4280 }
4281
4282 /* layout APIs */
4283
4284 static int lfsck_layout_reset(const struct lu_env *env,
4285                               struct lfsck_component *com, bool init)
4286 {
4287         struct lfsck_layout     *lo    = com->lc_file_ram;
4288         int                      rc;
4289
4290         down_write(&com->lc_sem);
4291         if (init) {
4292                 memset(lo, 0, com->lc_file_size);
4293         } else {
4294                 __u32 count = lo->ll_success_count;
4295                 __u64 last_time = lo->ll_time_last_complete;
4296
4297                 memset(lo, 0, com->lc_file_size);
4298                 lo->ll_success_count = count;
4299                 lo->ll_time_last_complete = last_time;
4300         }
4301
4302         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
4303         lo->ll_status = LS_INIT;
4304
4305         rc = lfsck_layout_store(env, com);
4306         up_write(&com->lc_sem);
4307
4308         CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
4309                lfsck_lfsck2name(com->lc_lfsck), rc);
4310
4311         return rc;
4312 }
4313
4314 static void lfsck_layout_fail(const struct lu_env *env,
4315                               struct lfsck_component *com, bool new_checked)
4316 {
4317         struct lfsck_layout *lo = com->lc_file_ram;
4318
4319         down_write(&com->lc_sem);
4320         if (new_checked)
4321                 com->lc_new_checked++;
4322         lfsck_layout_record_failure(env, com->lc_lfsck, lo);
4323         up_write(&com->lc_sem);
4324 }
4325
4326 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
4327                                           struct lfsck_component *com, bool init)
4328 {
4329         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4330         struct lfsck_layout             *lo      = com->lc_file_ram;
4331         struct lfsck_layout_master_data *llmd    = com->lc_data;
4332         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4333         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4334         struct l_wait_info               lwi     = { 0 };
4335         int                              rc;
4336
4337         if (com->lc_new_checked == 0 && !init)
4338                 return 0;
4339
4340         l_wait_event(mthread->t_ctl_waitq,
4341                      list_empty(&llmd->llmd_req_list) ||
4342                      !thread_is_running(mthread) ||
4343                      thread_is_stopped(athread),
4344                      &lwi);
4345
4346         if (!thread_is_running(mthread) || thread_is_stopped(athread))
4347                 return 0;
4348
4349         down_write(&com->lc_sem);
4350         if (init) {
4351                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4352         } else {
4353                 lo->ll_pos_last_checkpoint =
4354                                         lfsck->li_pos_current.lp_oit_cookie;
4355                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4356                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4357                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4358                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4359                 com->lc_new_checked = 0;
4360         }
4361
4362         rc = lfsck_layout_store(env, com);
4363         up_write(&com->lc_sem);
4364
4365         CDEBUG(D_LFSCK, "%s: layout LFSCK master checkpoint at the pos ["
4366                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4367                lfsck->li_pos_current.lp_oit_cookie, rc);
4368
4369         return rc;
4370 }
4371
4372 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
4373                                          struct lfsck_component *com, bool init)
4374 {
4375         struct lfsck_instance   *lfsck = com->lc_lfsck;
4376         struct lfsck_layout     *lo    = com->lc_file_ram;
4377         int                      rc;
4378
4379         if (com->lc_new_checked == 0 && !init)
4380                 return 0;
4381
4382         down_write(&com->lc_sem);
4383         if (init) {
4384                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4385         } else {
4386                 lo->ll_pos_last_checkpoint =
4387                                         lfsck->li_pos_current.lp_oit_cookie;
4388                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4389                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4390                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4391                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4392                 com->lc_new_checked = 0;
4393         }
4394
4395         rc = lfsck_layout_store(env, com);
4396         up_write(&com->lc_sem);
4397
4398         CDEBUG(D_LFSCK, "%s: layout LFSCK slave checkpoint at the pos ["
4399                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4400                lfsck->li_pos_current.lp_oit_cookie, rc);
4401
4402         return rc;
4403 }
4404
4405 static int lfsck_layout_prep(const struct lu_env *env,
4406                              struct lfsck_component *com,
4407                              struct lfsck_start *start)
4408 {
4409         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4410         struct lfsck_layout     *lo     = com->lc_file_ram;
4411         struct lfsck_position   *pos    = &com->lc_pos_start;
4412
4413         fid_zero(&pos->lp_dir_parent);
4414         pos->lp_dir_cookie = 0;
4415         if (lo->ll_status == LS_COMPLETED ||
4416             lo->ll_status == LS_PARTIAL ||
4417             /* To handle orphan, must scan from the beginning. */
4418             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
4419                 int rc;
4420
4421                 rc = lfsck_layout_reset(env, com, false);
4422                 if (rc == 0)
4423                         rc = lfsck_set_param(env, lfsck, start, true);
4424
4425                 if (rc != 0) {
4426                         CDEBUG(D_LFSCK, "%s: layout LFSCK prep failed: "
4427                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4428
4429                         return rc;
4430                 }
4431         }
4432
4433         down_write(&com->lc_sem);
4434         lo->ll_time_latest_start = cfs_time_current_sec();
4435         spin_lock(&lfsck->li_lock);
4436         if (lo->ll_flags & LF_SCANNED_ONCE) {
4437                 if (!lfsck->li_drop_dryrun ||
4438                     lo->ll_pos_first_inconsistent == 0) {
4439                         lo->ll_status = LS_SCANNING_PHASE2;
4440                         list_move_tail(&com->lc_link,
4441                                        &lfsck->li_list_double_scan);
4442                         pos->lp_oit_cookie = 0;
4443                 } else {
4444                         int i;
4445
4446                         lo->ll_status = LS_SCANNING_PHASE1;
4447                         lo->ll_run_time_phase1 = 0;
4448                         lo->ll_run_time_phase2 = 0;
4449                         lo->ll_objs_checked_phase1 = 0;
4450                         lo->ll_objs_checked_phase2 = 0;
4451                         lo->ll_objs_failed_phase1 = 0;
4452                         lo->ll_objs_failed_phase2 = 0;
4453                         for (i = 0; i < LLIT_MAX; i++)
4454                                 lo->ll_objs_repaired[i] = 0;
4455
4456                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4457                         fid_zero(&com->lc_fid_latest_scanned_phase2);
4458                 }
4459         } else {
4460                 lo->ll_status = LS_SCANNING_PHASE1;
4461                 if (!lfsck->li_drop_dryrun ||
4462                     lo->ll_pos_first_inconsistent == 0)
4463                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4464                 else
4465                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4466         }
4467         spin_unlock(&lfsck->li_lock);
4468         up_write(&com->lc_sem);
4469
4470         return 0;
4471 }
4472
4473 static int lfsck_layout_slave_prep(const struct lu_env *env,
4474                                    struct lfsck_component *com,
4475                                    struct lfsck_start_param *lsp)
4476 {
4477         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4478         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4479         struct lfsck_layout             *lo     = com->lc_file_ram;
4480         struct lfsck_start              *start  = lsp->lsp_start;
4481         int                              rc;
4482
4483         rc = lfsck_layout_prep(env, com, start);
4484         if (rc != 0)
4485                 return rc;
4486
4487         if (lo->ll_flags & LF_CRASHED_LASTID &&
4488             list_empty(&llsd->llsd_master_list)) {
4489                 LASSERT(lfsck->li_out_notify != NULL);
4490
4491                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4492                                      LE_LASTID_REBUILDING);
4493         }
4494
4495         if (!lsp->lsp_index_valid)
4496                 return 0;
4497
4498         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4499         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
4500                 LASSERT(!llsd->llsd_rbtree_valid);
4501
4502                 write_lock(&llsd->llsd_rb_lock);
4503                 rc = lfsck_rbtree_setup(env, com);
4504                 write_unlock(&llsd->llsd_rb_lock);
4505         }
4506
4507         CDEBUG(D_LFSCK, "%s: layout LFSCK slave prep done, start pos ["
4508                LPU64"]\n", lfsck_lfsck2name(lfsck),
4509                com->lc_pos_start.lp_oit_cookie);
4510
4511         return rc;
4512 }
4513
4514 static int lfsck_layout_master_prep(const struct lu_env *env,
4515                                     struct lfsck_component *com,
4516                                     struct lfsck_start_param *lsp)
4517 {
4518         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4519         struct lfsck_layout_master_data *llmd    = com->lc_data;
4520         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4521         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4522         struct lfsck_thread_args        *lta;
4523         struct task_struct              *task;
4524         int                              rc;
4525         ENTRY;
4526
4527         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4528         if (rc != 0)
4529                 RETURN(rc);
4530
4531         llmd->llmd_assistant_status = 0;
4532         llmd->llmd_post_result = 0;
4533         llmd->llmd_to_post = 0;
4534         llmd->llmd_to_double_scan = 0;
4535         llmd->llmd_in_double_scan = 0;
4536         llmd->llmd_exit = 0;
4537         thread_set_flags(athread, 0);
4538
4539         lta = lfsck_thread_args_init(lfsck, com, lsp);
4540         if (IS_ERR(lta))
4541                 RETURN(PTR_ERR(lta));
4542
4543         task = kthread_run(lfsck_layout_assistant, lta, "lfsck_layout");
4544         if (IS_ERR(task)) {
4545                 rc = PTR_ERR(task);
4546                 CERROR("%s: cannot start LFSCK layout assistant thread: "
4547                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4548                 lfsck_thread_args_fini(lta);
4549         } else {
4550                 struct l_wait_info lwi = { 0 };
4551
4552                 l_wait_event(mthread->t_ctl_waitq,
4553                              thread_is_running(athread) ||
4554                              thread_is_stopped(athread),
4555                              &lwi);
4556                 if (unlikely(!thread_is_running(athread)))
4557                         rc = llmd->llmd_assistant_status;
4558                 else
4559                         rc = 0;
4560         }
4561
4562         CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
4563                LPU64"\n", lfsck_lfsck2name(lfsck),
4564                com->lc_pos_start.lp_oit_cookie);
4565
4566         RETURN(rc);
4567 }
4568
4569 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4570 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4571                                      struct lfsck_component *com,
4572                                      struct dt_object *parent,
4573                                      struct lov_mds_md_v1 *lmm)
4574 {
4575         struct lfsck_thread_info        *info    = lfsck_env_info(env);
4576         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4577         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
4578         struct lfsck_layout             *lo      = com->lc_file_ram;
4579         struct lfsck_layout_master_data *llmd    = com->lc_data;
4580         struct lfsck_layout_object      *llo     = NULL;
4581         struct lov_ost_data_v1          *objs;
4582         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
4583         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4584         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4585                 struct l_wait_info       lwi     = { 0 };
4586         struct lu_buf                    buf;
4587         int                              rc      = 0;
4588         int                              i;
4589         __u32                            magic;
4590         __u16                            count;
4591         __u16                            gen;
4592         ENTRY;
4593
4594         lfsck_buf_init(&buf, &info->lti_old_pfid,
4595                        sizeof(struct filter_fid_old));
4596         count = le16_to_cpu(lmm->lmm_stripe_count);
4597         gen = le16_to_cpu(lmm->lmm_layout_gen);
4598         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4599          * been verified in lfsck_layout_verify_header() already. If some
4600          * new magic introduced in the future, then layout LFSCK needs to
4601          * be updated also. */
4602         magic = le32_to_cpu(lmm->lmm_magic);
4603         if (magic == LOV_MAGIC_V1) {
4604                 objs = &lmm->lmm_objects[0];
4605         } else {
4606                 LASSERT(magic == LOV_MAGIC_V3);
4607                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4608         }
4609
4610         for (i = 0; i < count; i++, objs++) {
4611                 struct lu_fid           *fid    = &info->lti_fid;
4612                 struct ost_id           *oi     = &info->lti_oi;
4613                 struct lfsck_layout_req *llr;
4614                 struct lfsck_tgt_desc   *tgt    = NULL;
4615                 struct dt_object        *cobj   = NULL;
4616                 __u32                    index;
4617                 bool                     wakeup = false;
4618
4619                 if (unlikely(lovea_slot_is_dummy(objs)))
4620                         continue;
4621
4622                 l_wait_event(mthread->t_ctl_waitq,
4623                              bk->lb_async_windows == 0 ||
4624                              llmd->llmd_prefetched < bk->lb_async_windows ||
4625                              !thread_is_running(mthread) ||
4626                              thread_is_stopped(athread),
4627                              &lwi);
4628
4629                 if (unlikely(!thread_is_running(mthread)) ||
4630                              thread_is_stopped(athread))
4631                         GOTO(out, rc = 0);
4632
4633                 if (unlikely(lfsck_is_dead_obj(parent)))
4634                         GOTO(out, rc = 0);
4635
4636                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4637                 index = le32_to_cpu(objs->l_ost_idx);
4638                 rc = ostid_to_fid(fid, oi, index);
4639                 if (rc != 0) {
4640                         CDEBUG(D_LFSCK, "%s: get invalid layout EA for "DFID
4641                                ": "DOSTID", idx:%u\n", lfsck_lfsck2name(lfsck),
4642                                PFID(lfsck_dto2fid(parent)), POSTID(oi), index);
4643                         goto next;
4644                 }
4645
4646                 tgt = lfsck_tgt_get(ltds, index);
4647                 if (unlikely(tgt == NULL)) {
4648                         CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
4649                                "did not join the layout LFSCK\n",
4650                                lfsck_lfsck2name(lfsck), index);
4651                         lo->ll_flags |= LF_INCOMPLETE;
4652                         goto next;
4653                 }
4654
4655                 /* There is potential deadlock race condition between object
4656                  * destroy and layout LFSCK. Consider the following scenario:
4657                  *
4658                  * 1) The LFSCK thread obtained the parent object firstly, at
4659                  *    that time, the parent object has not been destroyed yet.
4660                  *
4661                  * 2) One RPC service thread destroyed the parent and all its
4662                  *    children objects. Because the LFSCK is referencing the
4663                  *    parent object, then the parent object will be marked as
4664                  *    dying in RAM. On the other hand, the parent object is
4665                  *    referencing all its children objects, then all children
4666                  *    objects will be marked as dying in RAM also.
4667                  *
4668                  * 3) The LFSCK thread tries to find some child object with
4669                  *    the parent object referenced. Then it will find that the
4670                  *    child object is dying. According to the object visibility
4671                  *    rules: the object with dying flag cannot be returned to
4672                  *    others. So the LFSCK thread has to wait until the dying
4673                  *    object has been purged from RAM, then it can allocate a
4674                  *    new object (with the same FID) in RAM. Unfortunately, the
4675                  *    LFSCK thread itself is referencing the parent object, and
4676                  *    cause the parent object cannot be purged, then cause the
4677                  *    child object cannot be purged also. So the LFSCK thread
4678                  *    will fall into deadlock.
4679                  *
4680                  * We introduce non-blocked version lu_object_find() to allow
4681                  * the LFSCK thread to return failure immediately (instead of
4682                  * wait) when it finds dying (child) object, then the LFSCK
4683                  * thread can check whether the parent object is dying or not.
4684                  * So avoid above deadlock. LU-5395 */
4685                 cobj = lfsck_object_find_by_dev_nowait(env, tgt->ltd_tgt, fid);
4686                 if (IS_ERR(cobj)) {
4687                         if (lfsck_is_dead_obj(parent)) {
4688                                 lfsck_tgt_put(tgt);
4689
4690                                 GOTO(out, rc = 0);
4691                         }
4692
4693                         rc = PTR_ERR(cobj);
4694                         goto next;
4695                 }
4696
4697                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4698                 if (rc != 0)
4699                         goto next;
4700
4701                 rc = dt_declare_xattr_get(env, cobj, &buf, XATTR_NAME_FID,
4702                                           BYPASS_CAPA);
4703                 if (rc != 0)
4704                         goto next;
4705
4706                 if (llo == NULL) {
4707                         llo = lfsck_layout_object_init(env, parent, gen);
4708                         if (IS_ERR(llo)) {
4709                                 rc = PTR_ERR(llo);
4710                                 goto next;
4711                         }
4712                 }
4713
4714                 llr = lfsck_layout_req_init(llo, cobj, index, i);
4715                 if (IS_ERR(llr)) {
4716                         rc = PTR_ERR(llr);
4717                         goto next;
4718                 }
4719
4720                 cobj = NULL;
4721                 spin_lock(&llmd->llmd_lock);
4722                 if (llmd->llmd_assistant_status < 0) {
4723                         spin_unlock(&llmd->llmd_lock);
4724                         lfsck_layout_req_fini(env, llr);
4725                         lfsck_tgt_put(tgt);
4726                         RETURN(llmd->llmd_assistant_status);
4727                 }
4728
4729                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
4730                 if (llmd->llmd_prefetched == 0)
4731                         wakeup = true;
4732
4733                 llmd->llmd_prefetched++;
4734                 spin_unlock(&llmd->llmd_lock);
4735                 if (wakeup)
4736                         wake_up_all(&athread->t_ctl_waitq);
4737
4738 next:
4739                 down_write(&com->lc_sem);
4740                 com->lc_new_checked++;
4741                 if (rc < 0)
4742                         lfsck_layout_record_failure(env, lfsck, lo);
4743                 up_write(&com->lc_sem);
4744
4745                 if (cobj != NULL && !IS_ERR(cobj))
4746                         lu_object_put(env, &cobj->do_lu);
4747
4748                 if (likely(tgt != NULL))
4749                         lfsck_tgt_put(tgt);
4750
4751                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4752                         GOTO(out, rc);
4753         }
4754
4755         GOTO(out, rc = 0);
4756
4757 out:
4758         if (llo != NULL && !IS_ERR(llo))
4759                 lfsck_layout_object_put(env, llo);
4760
4761         return rc;
4762 }
4763
4764 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4765  * the OST-object's attribute and generate an structure lfsck_layout_req on the
4766  * list ::llmd_req_list.
4767  *
4768  * For each request on above list, the lfsck_layout_assistant thread compares
4769  * the OST side attribute with local attribute, if inconsistent, then repair it.
4770  *
4771  * All above processing is async mode with pipeline. */
4772 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4773                                         struct lfsck_component *com,
4774                                         struct dt_object *obj)
4775 {
4776         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4777         struct ost_id                   *oi     = &info->lti_oi;
4778         struct lfsck_layout             *lo     = com->lc_file_ram;
4779         struct lfsck_layout_master_data *llmd   = com->lc_data;
4780         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4781         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4782         struct thandle                  *handle = NULL;
4783         struct lu_buf                   *buf    = &info->lti_big_buf;
4784         struct lov_mds_md_v1            *lmm    = NULL;
4785         struct dt_device                *dev    = lfsck->li_bottom;
4786         struct lustre_handle             lh     = { 0 };
4787         struct lu_buf                    ea_buf = { 0 };
4788         int                              rc     = 0;
4789         int                              size   = 0;
4790         bool                             locked = false;
4791         bool                             stripe = false;
4792         bool                             bad_oi = false;
4793         ENTRY;
4794
4795         if (!S_ISREG(lfsck_object_type(obj)))
4796                 GOTO(out, rc = 0);
4797
4798         if (llmd->llmd_assistant_status < 0)
4799                 GOTO(out, rc = -ESRCH);
4800
4801         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4802         lmm_oi_cpu_to_le(oi, oi);
4803         dt_read_lock(env, obj, 0);
4804         locked = true;
4805
4806 again:
4807         if (dt_object_exists(obj) == 0 ||
4808             lfsck_is_dead_obj(obj))
4809                 GOTO(out, rc = 0);
4810
4811         rc = lfsck_layout_get_lovea(env, obj, buf);
4812         if (rc <= 0)
4813                 GOTO(out, rc);
4814
4815         size = rc;
4816         lmm = buf->lb_buf;
4817         rc = lfsck_layout_verify_header(lmm);
4818         /* If the LOV EA crashed, then it is possible to be rebuilt later
4819          * when handle orphan OST-objects. */
4820         if (rc != 0)
4821                 GOTO(out, rc);
4822
4823         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4824                 GOTO(out, stripe = true);
4825
4826         /* Inconsistent lmm_oi, should be repaired. */
4827         bad_oi = true;
4828         lmm->lmm_oi = *oi;
4829
4830         if (bk->lb_param & LPF_DRYRUN) {
4831                 down_write(&com->lc_sem);
4832                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4833                 up_write(&com->lc_sem);
4834
4835                 GOTO(out, stripe = true);
4836         }
4837
4838         if (!lustre_handle_is_used(&lh)) {
4839                 dt_read_unlock(env, obj);
4840                 locked = false;
4841                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
4842                                       MDS_INODELOCK_LAYOUT |
4843                                       MDS_INODELOCK_XATTR, LCK_EX);
4844                 if (rc != 0)
4845                         GOTO(out, rc);
4846
4847                 handle = dt_trans_create(env, dev);
4848                 if (IS_ERR(handle))
4849                         GOTO(out, rc = PTR_ERR(handle));
4850
4851                 lfsck_buf_init(&ea_buf, lmm, size);
4852                 rc = dt_declare_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4853                                           LU_XATTR_REPLACE, handle);
4854                 if (rc != 0)
4855                         GOTO(out, rc);
4856
4857                 rc = dt_trans_start_local(env, dev, handle);
4858                 if (rc != 0)
4859                         GOTO(out, rc);
4860
4861                 dt_write_lock(env, obj, 0);
4862                 locked = true;
4863
4864                 goto again;
4865         }
4866
4867         rc = dt_xattr_set(env, obj, &ea_buf, XATTR_NAME_LOV,
4868                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4869         if (rc != 0)
4870                 GOTO(out, rc);
4871
4872         down_write(&com->lc_sem);
4873         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4874         up_write(&com->lc_sem);
4875
4876         GOTO(out, stripe = true);
4877
4878 out:
4879         if (locked) {
4880                 if (lustre_handle_is_used(&lh))
4881                         dt_write_unlock(env, obj);
4882                 else
4883                         dt_read_unlock(env, obj);
4884         }
4885
4886         if (handle != NULL && !IS_ERR(handle))
4887                 dt_trans_stop(env, dev, handle);
4888
4889         lfsck_ibits_unlock(&lh, LCK_EX);
4890
4891         if (bad_oi)
4892                 CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
4893                        DFID": rc = %d\n", lfsck_lfsck2name(lfsck),
4894                        bk->lb_param & LPF_DRYRUN ? "found" : "repaired",
4895                        PFID(lfsck_dto2fid(obj)), rc);
4896
4897         if (stripe) {
4898                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4899         } else {
4900                 down_write(&com->lc_sem);
4901                 com->lc_new_checked++;
4902                 if (rc < 0)
4903                         lfsck_layout_record_failure(env, lfsck, lo);
4904                 up_write(&com->lc_sem);
4905         }
4906
4907         return rc;
4908 }
4909
4910 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4911                                        struct lfsck_component *com,
4912                                        struct dt_object *obj)
4913 {
4914         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4915         struct lfsck_layout             *lo     = com->lc_file_ram;
4916         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
4917         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4918         struct lfsck_layout_seq         *lls;
4919         __u64                            seq;
4920         __u64                            oid;
4921         int                              rc;
4922         ENTRY;
4923
4924         LASSERT(llsd != NULL);
4925
4926         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
4927             cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
4928                 struct l_wait_info       lwi = LWI_TIMEOUT(cfs_time_seconds(1),
4929                                                            NULL, NULL);
4930                 struct ptlrpc_thread    *thread = &lfsck->li_thread;
4931
4932                 l_wait_event(thread->t_ctl_waitq,
4933                              !thread_is_running(thread),
4934                              &lwi);
4935         }
4936
4937         lfsck_rbtree_update_bitmap(env, com, fid, false);
4938
4939         down_write(&com->lc_sem);
4940         if (fid_is_idif(fid))
4941                 seq = 0;
4942         else if (!fid_is_norm(fid) ||
4943                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4944                 GOTO(unlock, rc = 0);
4945         else
4946                 seq = fid_seq(fid);
4947         com->lc_new_checked++;
4948
4949         lls = lfsck_layout_seq_lookup(llsd, seq);
4950         if (lls == NULL) {
4951                 OBD_ALLOC_PTR(lls);
4952                 if (unlikely(lls == NULL))
4953                         GOTO(unlock, rc = -ENOMEM);
4954
4955                 INIT_LIST_HEAD(&lls->lls_list);
4956                 lls->lls_seq = seq;
4957                 rc = lfsck_layout_lastid_load(env, com, lls);
4958                 if (rc != 0) {
4959                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4960                               "load LAST_ID for "LPX64": rc = %d\n",
4961                               lfsck_lfsck2name(com->lc_lfsck), seq, rc);
4962                         lo->ll_objs_failed_phase1++;
4963                         OBD_FREE_PTR(lls);
4964                         GOTO(unlock, rc);
4965                 }
4966
4967                 lfsck_layout_seq_insert(llsd, lls);
4968         }
4969
4970         if (unlikely(fid_is_last_id(fid)))
4971                 GOTO(unlock, rc = 0);
4972
4973         if (fid_is_idif(fid))
4974                 oid = fid_idif_id(fid_seq(fid), fid_oid(fid), fid_ver(fid));
4975         else
4976                 oid = fid_oid(fid);
4977
4978         if (oid > lls->lls_lastid_known)
4979                 lls->lls_lastid_known = oid;
4980
4981         if (oid > lls->lls_lastid) {
4982                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4983                         /* OFD may create new objects during LFSCK scanning. */
4984                         rc = lfsck_layout_lastid_reload(env, com, lls);
4985                         if (unlikely(rc != 0)) {
4986                                 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4987                                       "reload LAST_ID for "LPX64": rc = %d\n",
4988                                       lfsck_lfsck2name(com->lc_lfsck),
4989                                       lls->lls_seq, rc);
4990
4991                                 GOTO(unlock, rc);
4992                         }
4993
4994                         if (oid <= lls->lls_lastid ||
4995                             lo->ll_flags & LF_CRASHED_LASTID)
4996                                 GOTO(unlock, rc = 0);
4997
4998                         LASSERT(lfsck->li_out_notify != NULL);
4999
5000                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5001                                              LE_LASTID_REBUILDING);
5002                         lo->ll_flags |= LF_CRASHED_LASTID;
5003
5004                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
5005                                "LAST_ID file (2) for the sequence "LPX64
5006                                ", old value "LPU64", known value "LPU64"\n",
5007                                lfsck_lfsck2name(lfsck), lls->lls_seq,
5008                                lls->lls_lastid, oid);
5009                 }
5010
5011                 lls->lls_lastid = oid;
5012                 lls->lls_dirty = 1;
5013         }
5014
5015         GOTO(unlock, rc = 0);
5016
5017 unlock:
5018         up_write(&com->lc_sem);
5019
5020         return rc;
5021 }
5022
5023 static int lfsck_layout_exec_dir(const struct lu_env *env,
5024                                  struct lfsck_component *com,
5025                                  struct dt_object *obj,
5026                                  struct lu_dirent *ent)
5027 {
5028         return 0;
5029 }
5030
5031 static int lfsck_layout_master_post(const struct lu_env *env,
5032                                     struct lfsck_component *com,
5033                                     int result, bool init)
5034 {
5035         struct lfsck_instance           *lfsck   = com->lc_lfsck;
5036         struct lfsck_layout             *lo      = com->lc_file_ram;
5037         struct lfsck_layout_master_data *llmd    = com->lc_data;
5038         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
5039         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5040         struct l_wait_info               lwi     = { 0 };
5041         int                              rc;
5042         ENTRY;
5043
5044
5045         llmd->llmd_post_result = result;
5046         llmd->llmd_to_post = 1;
5047         if (llmd->llmd_post_result <= 0)
5048                 llmd->llmd_exit = 1;
5049
5050         wake_up_all(&athread->t_ctl_waitq);
5051         l_wait_event(mthread->t_ctl_waitq,
5052                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
5053                      thread_is_stopped(athread),
5054                      &lwi);
5055
5056         if (llmd->llmd_assistant_status < 0)
5057                 result = llmd->llmd_assistant_status;
5058
5059         down_write(&com->lc_sem);
5060         spin_lock(&lfsck->li_lock);
5061         /* When LFSCK failed, there may be some prefetched objects those are
5062          * not been processed yet, we do not know the exactly position, then
5063          * just restart from last check-point next time. */
5064         if (!init && !llmd->llmd_exit)
5065                 lo->ll_pos_last_checkpoint =
5066                                         lfsck->li_pos_current.lp_oit_cookie;
5067
5068         if (result > 0) {
5069                 lo->ll_status = LS_SCANNING_PHASE2;
5070                 lo->ll_flags |= LF_SCANNED_ONCE;
5071                 lo->ll_flags &= ~LF_UPGRADE;
5072                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
5073         } else if (result == 0) {
5074                 lo->ll_status = lfsck->li_status;
5075                 if (lo->ll_status == 0)
5076                         lo->ll_status = LS_STOPPED;
5077                 if (lo->ll_status != LS_PAUSED) {
5078                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5079                 }
5080         } else {
5081                 lo->ll_status = LS_FAILED;
5082                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5083         }
5084         spin_unlock(&lfsck->li_lock);
5085
5086         if (!init) {
5087                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5088                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5089                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5090                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5091                 com->lc_new_checked = 0;
5092         }
5093
5094         rc = lfsck_layout_store(env, com);
5095         up_write(&com->lc_sem);
5096
5097         CDEBUG(D_LFSCK, "%s: layout LFSCK master post done: rc = %d\n",
5098                lfsck_lfsck2name(lfsck), rc);
5099
5100         RETURN(rc);
5101 }
5102
5103 static int lfsck_layout_slave_post(const struct lu_env *env,
5104                                    struct lfsck_component *com,
5105                                    int result, bool init)
5106 {
5107         struct lfsck_instance   *lfsck = com->lc_lfsck;
5108         struct lfsck_layout     *lo    = com->lc_file_ram;
5109         int                      rc;
5110         bool                     done  = false;
5111
5112         rc = lfsck_layout_lastid_store(env, com);
5113         if (rc != 0)
5114                 result = rc;
5115
5116         LASSERT(lfsck->li_out_notify != NULL);
5117
5118         down_write(&com->lc_sem);
5119         spin_lock(&lfsck->li_lock);
5120         if (!init)
5121                 lo->ll_pos_last_checkpoint =
5122                                         lfsck->li_pos_current.lp_oit_cookie;
5123         if (result > 0) {
5124                 lo->ll_status = LS_SCANNING_PHASE2;
5125                 lo->ll_flags |= LF_SCANNED_ONCE;
5126                 if (lo->ll_flags & LF_CRASHED_LASTID) {
5127                         done = true;
5128                         lo->ll_flags &= ~LF_CRASHED_LASTID;
5129
5130                         CDEBUG(D_LFSCK, "%s: layout LFSCK has rebuilt "
5131                                "crashed LAST_ID files successfully\n",
5132                                lfsck_lfsck2name(lfsck));
5133                 }
5134                 lo->ll_flags &= ~LF_UPGRADE;
5135                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
5136         } else if (result == 0) {
5137                 lo->ll_status = lfsck->li_status;
5138                 if (lo->ll_status == 0)
5139                         lo->ll_status = LS_STOPPED;
5140                 if (lo->ll_status != LS_PAUSED)
5141                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5142         } else {
5143                 lo->ll_status = LS_FAILED;
5144                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
5145         }
5146         spin_unlock(&lfsck->li_lock);
5147
5148         if (done)
5149                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5150                                      LE_LASTID_REBUILT);
5151
5152         if (!init) {
5153                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5154                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5155                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5156                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5157                 com->lc_new_checked = 0;
5158         }
5159
5160         rc = lfsck_layout_store(env, com);
5161         up_write(&com->lc_sem);
5162
5163         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
5164
5165         if (result <= 0)
5166                 lfsck_rbtree_cleanup(env, com);
5167
5168         CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
5169                lfsck_lfsck2name(lfsck), rc);
5170
5171         return rc;
5172 }
5173
5174 static int lfsck_layout_dump(const struct lu_env *env,
5175                              struct lfsck_component *com, struct seq_file *m)
5176 {
5177         struct lfsck_instance   *lfsck = com->lc_lfsck;
5178         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
5179         struct lfsck_layout     *lo    = com->lc_file_ram;
5180         int                      rc;
5181
5182         down_read(&com->lc_sem);
5183         seq_printf(m, "name: lfsck_layout\n"
5184                       "magic: %#x\n"
5185                       "version: %d\n"
5186                       "status: %s\n",
5187                       lo->ll_magic,
5188                       bk->lb_version,
5189                       lfsck_status2names(lo->ll_status));
5190
5191         rc = lfsck_bits_dump(m, lo->ll_flags, lfsck_flags_names, "flags");
5192         if (rc < 0)
5193                 goto out;
5194
5195         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
5196         if (rc < 0)
5197                 goto out;
5198
5199         rc = lfsck_time_dump(m, lo->ll_time_last_complete,
5200                              "time_since_last_completed");
5201         if (rc < 0)
5202                 goto out;
5203
5204         rc = lfsck_time_dump(m, lo->ll_time_latest_start,
5205                              "time_since_latest_start");
5206         if (rc < 0)
5207                 goto out;
5208
5209         rc = lfsck_time_dump(m, lo->ll_time_last_checkpoint,
5210                              "time_since_last_checkpoint");
5211         if (rc < 0)
5212                 goto out;
5213
5214         seq_printf(m, "latest_start_position: "LPU64"\n"
5215                       "last_checkpoint_position: "LPU64"\n"
5216                       "first_failure_position: "LPU64"\n",
5217                       lo->ll_pos_latest_start,
5218                       lo->ll_pos_last_checkpoint,
5219                       lo->ll_pos_first_inconsistent);
5220
5221         seq_printf(m, "success_count: %u\n"
5222                       "repaired_dangling: "LPU64"\n"
5223                       "repaired_unmatched_pair: "LPU64"\n"
5224                       "repaired_multiple_referenced: "LPU64"\n"
5225                       "repaired_orphan: "LPU64"\n"
5226                       "repaired_inconsistent_owner: "LPU64"\n"
5227                       "repaired_others: "LPU64"\n"
5228                       "skipped: "LPU64"\n"
5229                       "failed_phase1: "LPU64"\n"
5230                       "failed_phase2: "LPU64"\n",
5231                       lo->ll_success_count,
5232                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
5233                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
5234                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
5235                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
5236                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
5237                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
5238                       lo->ll_objs_skipped,
5239                       lo->ll_objs_failed_phase1,
5240                       lo->ll_objs_failed_phase2);
5241
5242         if (lo->ll_status == LS_SCANNING_PHASE1) {
5243                 __u64 pos;
5244                 const struct dt_it_ops *iops;
5245                 cfs_duration_t duration = cfs_time_current() -
5246                                           lfsck->li_time_last_checkpoint;
5247                 __u64 checked = lo->ll_objs_checked_phase1 +
5248                                 com->lc_new_checked;
5249                 __u64 speed = checked;
5250                 __u64 new_checked = com->lc_new_checked * HZ;
5251                 __u32 rtime = lo->ll_run_time_phase1 +
5252                               cfs_duration_sec(duration + HALF_SEC);
5253
5254                 if (duration != 0)
5255                         do_div(new_checked, duration);
5256                 if (rtime != 0)
5257                         do_div(speed, rtime);
5258                 seq_printf(m, "checked_phase1: "LPU64"\n"
5259                               "checked_phase2: "LPU64"\n"
5260                               "run_time_phase1: %u seconds\n"
5261                               "run_time_phase2: %u seconds\n"
5262                               "average_speed_phase1: "LPU64" items/sec\n"
5263                               "average_speed_phase2: N/A\n"
5264                               "real-time_speed_phase1: "LPU64" items/sec\n"
5265                               "real-time_speed_phase2: N/A\n",
5266                               checked,
5267                               lo->ll_objs_checked_phase2,
5268                               rtime,
5269                               lo->ll_run_time_phase2,
5270                               speed,
5271                               new_checked);
5272
5273                 LASSERT(lfsck->li_di_oit != NULL);
5274
5275                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
5276
5277                 /* The low layer otable-based iteration position may NOT
5278                  * exactly match the layout-based directory traversal
5279                  * cookie. Generally, it is not a serious issue. But the
5280                  * caller should NOT make assumption on that. */
5281                 pos = iops->store(env, lfsck->li_di_oit);
5282                 if (!lfsck->li_current_oit_processed)
5283                         pos--;
5284                 seq_printf(m, "current_position: "LPU64"\n", pos);
5285
5286         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
5287                 cfs_duration_t duration = cfs_time_current() -
5288                                           lfsck->li_time_last_checkpoint;
5289                 __u64 checked = lo->ll_objs_checked_phase2 +
5290                                 com->lc_new_checked;
5291                 __u64 speed1 = lo->ll_objs_checked_phase1;
5292                 __u64 speed2 = checked;
5293                 __u64 new_checked = com->lc_new_checked * HZ;
5294                 __u32 rtime = lo->ll_run_time_phase2 +
5295                               cfs_duration_sec(duration + HALF_SEC);
5296
5297                 if (duration != 0)
5298                         do_div(new_checked, duration);
5299                 if (lo->ll_run_time_phase1 != 0)
5300                         do_div(speed1, lo->ll_run_time_phase1);
5301                 if (rtime != 0)
5302                         do_div(speed2, rtime);
5303                 rc = seq_printf(m, "checked_phase1: "LPU64"\n"
5304                                 "checked_phase2: "LPU64"\n"
5305                                 "run_time_phase1: %u seconds\n"
5306                                 "run_time_phase2: %u seconds\n"
5307                                 "average_speed_phase1: "LPU64" items/sec\n"
5308                                 "average_speed_phase2: "LPU64" items/sec\n"
5309                                 "real-time_speed_phase1: N/A\n"
5310                                 "real-time_speed_phase2: "LPU64" items/sec\n"
5311                                 "current_position: "DFID"\n",
5312                                 lo->ll_objs_checked_phase1,
5313                                 checked,
5314                                 lo->ll_run_time_phase1,
5315                                 rtime,
5316                                 speed1,
5317                                 speed2,
5318                                 new_checked,
5319                                 PFID(&com->lc_fid_latest_scanned_phase2));
5320                 if (rc <= 0)
5321                         goto out;
5322
5323         } else {
5324                 __u64 speed1 = lo->ll_objs_checked_phase1;
5325                 __u64 speed2 = lo->ll_objs_checked_phase2;
5326
5327                 if (lo->ll_run_time_phase1 != 0)
5328                         do_div(speed1, lo->ll_run_time_phase1);
5329                 if (lo->ll_run_time_phase2 != 0)
5330                         do_div(speed2, lo->ll_run_time_phase2);
5331                 seq_printf(m, "checked_phase1: "LPU64"\n"
5332                            "checked_phase2: "LPU64"\n"
5333                            "run_time_phase1: %u seconds\n"
5334                            "run_time_phase2: %u seconds\n"
5335                            "average_speed_phase1: "LPU64" items/sec\n"
5336                            "average_speed_phase2: "LPU64" objs/sec\n"
5337                            "real-time_speed_phase1: N/A\n"
5338                            "real-time_speed_phase2: N/A\n"
5339                            "current_position: N/A\n",
5340                            lo->ll_objs_checked_phase1,
5341                            lo->ll_objs_checked_phase2,
5342                            lo->ll_run_time_phase1,
5343                            lo->ll_run_time_phase2,
5344                            speed1,
5345                            speed2);
5346         }
5347 out:
5348         up_read(&com->lc_sem);
5349
5350         return rc;
5351 }
5352
5353 static int lfsck_layout_master_double_scan(const struct lu_env *env,
5354                                            struct lfsck_component *com)
5355 {
5356         struct lfsck_layout_master_data *llmd    = com->lc_data;
5357         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5358         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5359         struct lfsck_layout             *lo      = com->lc_file_ram;
5360         struct l_wait_info               lwi     = { 0 };
5361
5362         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
5363                 return 0;
5364
5365         llmd->llmd_to_double_scan = 1;
5366         wake_up_all(&athread->t_ctl_waitq);
5367         l_wait_event(mthread->t_ctl_waitq,
5368                      llmd->llmd_in_double_scan ||
5369                      thread_is_stopped(athread),
5370                      &lwi);
5371         if (llmd->llmd_assistant_status < 0)
5372                 return llmd->llmd_assistant_status;
5373
5374         return 0;
5375 }
5376
5377 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
5378                                           struct lfsck_component *com)
5379 {
5380         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5381         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5382         struct lfsck_layout             *lo     = com->lc_file_ram;
5383         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5384         int                              rc;
5385         ENTRY;
5386
5387         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
5388                 lfsck_rbtree_cleanup(env, com);
5389                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
5390                 RETURN(0);
5391         }
5392
5393         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
5394                lfsck_lfsck2name(lfsck));
5395
5396         atomic_inc(&lfsck->li_double_scan_count);
5397
5398         com->lc_new_checked = 0;
5399         com->lc_new_scanned = 0;
5400         com->lc_time_last_checkpoint = cfs_time_current();
5401         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
5402                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5403
5404         while (1) {
5405                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
5406                                                      NULL, NULL);
5407
5408                 rc = lfsck_layout_slave_query_master(env, com);
5409                 if (list_empty(&llsd->llsd_master_list)) {
5410                         if (unlikely(!thread_is_running(thread)))
5411                                 rc = 0;
5412                         else
5413                                 rc = 1;
5414
5415                         GOTO(done, rc);
5416                 }
5417
5418                 if (rc < 0)
5419                         GOTO(done, rc);
5420
5421                 rc = l_wait_event(thread->t_ctl_waitq,
5422                                   !thread_is_running(thread) ||
5423                                   list_empty(&llsd->llsd_master_list),
5424                                   &lwi);
5425                 if (unlikely(!thread_is_running(thread)))
5426                         GOTO(done, rc = 0);
5427
5428                 if (rc == -ETIMEDOUT)
5429                         continue;
5430
5431                 GOTO(done, rc = (rc < 0 ? rc : 1));
5432         }
5433
5434 done:
5435         rc = lfsck_layout_double_scan_result(env, com, rc);
5436
5437         lfsck_rbtree_cleanup(env, com);
5438         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
5439         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5440                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5441
5442         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan finished, "
5443                "status %d: rc = %d\n",
5444                lfsck_lfsck2name(lfsck), lo->ll_status, rc);
5445
5446         return rc;
5447 }
5448
5449 static void lfsck_layout_master_data_release(const struct lu_env *env,
5450                                              struct lfsck_component *com)
5451 {
5452         struct lfsck_layout_master_data *llmd   = com->lc_data;
5453         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5454         struct lfsck_tgt_descs          *ltds;
5455         struct lfsck_tgt_desc           *ltd;
5456         struct lfsck_tgt_desc           *next;
5457
5458         LASSERT(llmd != NULL);
5459         LASSERT(thread_is_init(&llmd->llmd_thread) ||
5460                 thread_is_stopped(&llmd->llmd_thread));
5461         LASSERT(list_empty(&llmd->llmd_req_list));
5462
5463         com->lc_data = NULL;
5464
5465         ltds = &lfsck->li_ost_descs;
5466         spin_lock(&ltds->ltd_lock);
5467         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
5468                                  ltd_layout_phase_list) {
5469                 list_del_init(&ltd->ltd_layout_phase_list);
5470         }
5471         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
5472                                  ltd_layout_phase_list) {
5473                 list_del_init(&ltd->ltd_layout_phase_list);
5474         }
5475         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
5476                                  ltd_layout_list) {
5477                 list_del_init(&ltd->ltd_layout_list);
5478         }
5479         spin_unlock(&ltds->ltd_lock);
5480
5481         ltds = &lfsck->li_mdt_descs;
5482         spin_lock(&ltds->ltd_lock);
5483         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
5484                                  ltd_layout_phase_list) {
5485                 list_del_init(&ltd->ltd_layout_phase_list);
5486         }
5487         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
5488                                  ltd_layout_phase_list) {
5489                 list_del_init(&ltd->ltd_layout_phase_list);
5490         }
5491         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
5492                                  ltd_layout_list) {
5493                 list_del_init(&ltd->ltd_layout_list);
5494         }
5495         spin_unlock(&ltds->ltd_lock);
5496
5497         OBD_FREE_PTR(llmd);
5498 }
5499
5500 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5501                                             struct lfsck_component *com)
5502 {
5503         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5504         struct lfsck_layout_seq          *lls;
5505         struct lfsck_layout_seq          *next;
5506         struct lfsck_layout_slave_target *llst;
5507         struct lfsck_layout_slave_target *tmp;
5508
5509         LASSERT(llsd != NULL);
5510
5511         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5512                                      lls_list) {
5513                 list_del_init(&lls->lls_list);
5514                 lfsck_object_put(env, lls->lls_lastid_obj);
5515                 OBD_FREE_PTR(lls);
5516         }
5517
5518         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
5519                                  llst_list) {
5520                 list_del_init(&llst->llst_list);
5521                 OBD_FREE_PTR(llst);
5522         }
5523
5524         lfsck_rbtree_cleanup(env, com);
5525         com->lc_data = NULL;
5526         OBD_FREE_PTR(llsd);
5527 }
5528
5529 static void lfsck_layout_master_quit(const struct lu_env *env,
5530                                      struct lfsck_component *com)
5531 {
5532         struct lfsck_layout_master_data *llmd    = com->lc_data;
5533         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5534         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5535         struct l_wait_info               lwi     = { 0 };
5536
5537         llmd->llmd_exit = 1;
5538         wake_up_all(&athread->t_ctl_waitq);
5539         l_wait_event(mthread->t_ctl_waitq,
5540                      thread_is_init(athread) ||
5541                      thread_is_stopped(athread),
5542                      &lwi);
5543 }
5544
5545 static void lfsck_layout_slave_quit(const struct lu_env *env,
5546                                     struct lfsck_component *com)
5547 {
5548         lfsck_rbtree_cleanup(env, com);
5549 }
5550
5551 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5552                                          struct lfsck_component *com,
5553                                          struct lfsck_request *lr)
5554 {
5555         struct lfsck_instance           *lfsck = com->lc_lfsck;
5556         struct lfsck_layout             *lo    = com->lc_file_ram;
5557         struct lfsck_layout_master_data *llmd  = com->lc_data;
5558         struct lfsck_tgt_descs          *ltds;
5559         struct lfsck_tgt_desc           *ltd;
5560         bool                             fail  = false;
5561         ENTRY;
5562
5563         if (lr->lr_event == LE_PAIRS_VERIFY) {
5564                 int rc;
5565
5566                 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5567                                                      &lr->lr_fid2);
5568
5569                 RETURN(rc);
5570         }
5571
5572         CDEBUG(D_LFSCK, "%s: layout LFSCK master handle notify %u "
5573                "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
5574                lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5575                lr->lr_index, lr->lr_status);
5576
5577         if (lr->lr_event != LE_PHASE1_DONE &&
5578             lr->lr_event != LE_PHASE2_DONE &&
5579             lr->lr_event != LE_PEER_EXIT)
5580                 RETURN(-EINVAL);
5581
5582         if (lr->lr_flags & LEF_FROM_OST)
5583                 ltds = &lfsck->li_ost_descs;
5584         else
5585                 ltds = &lfsck->li_mdt_descs;
5586         spin_lock(&ltds->ltd_lock);
5587         ltd = LTD_TGT(ltds, lr->lr_index);
5588         if (ltd == NULL) {
5589                 spin_unlock(&ltds->ltd_lock);
5590
5591                 RETURN(-ENXIO);
5592         }
5593
5594         list_del_init(&ltd->ltd_layout_phase_list);
5595         switch (lr->lr_event) {
5596         case LE_PHASE1_DONE:
5597                 if (lr->lr_status <= 0) {
5598                         ltd->ltd_layout_done = 1;
5599                         list_del_init(&ltd->ltd_layout_list);
5600                         lo->ll_flags |= LF_INCOMPLETE;
5601                         fail = true;
5602                         break;
5603                 }
5604
5605                 if (lr->lr_flags & LEF_FROM_OST) {
5606                         if (list_empty(&ltd->ltd_layout_list))
5607                                 list_add_tail(&ltd->ltd_layout_list,
5608                                               &llmd->llmd_ost_list);
5609                         list_add_tail(&ltd->ltd_layout_phase_list,
5610                                       &llmd->llmd_ost_phase2_list);
5611                 } else {
5612                         if (list_empty(&ltd->ltd_layout_list))
5613                                 list_add_tail(&ltd->ltd_layout_list,
5614                                               &llmd->llmd_mdt_list);
5615                         list_add_tail(&ltd->ltd_layout_phase_list,
5616                                       &llmd->llmd_mdt_phase2_list);
5617                 }
5618                 break;
5619         case LE_PHASE2_DONE:
5620                 ltd->ltd_layout_done = 1;
5621                 list_del_init(&ltd->ltd_layout_list);
5622                 break;
5623         case LE_PEER_EXIT:
5624                 fail = true;
5625                 ltd->ltd_layout_done = 1;
5626                 list_del_init(&ltd->ltd_layout_list);
5627                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
5628                         lo->ll_flags |= LF_INCOMPLETE;
5629                 break;
5630         default:
5631                 break;
5632         }
5633         spin_unlock(&ltds->ltd_lock);
5634
5635         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5636                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5637
5638                 memset(stop, 0, sizeof(*stop));
5639                 stop->ls_status = lr->lr_status;
5640                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5641                 lfsck_stop(env, lfsck->li_bottom, stop);
5642         } else if (lfsck_layout_master_to_orphan(llmd)) {
5643                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
5644         }
5645
5646         RETURN(0);
5647 }
5648
5649 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5650                                         struct lfsck_component *com,
5651                                         struct lfsck_request *lr)
5652 {
5653         struct lfsck_instance            *lfsck = com->lc_lfsck;
5654         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5655         struct lfsck_layout_slave_target *llst;
5656         int                               rc;
5657         ENTRY;
5658
5659         switch (lr->lr_event) {
5660         case LE_FID_ACCESSED:
5661                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5662                 RETURN(0);
5663         case LE_CONDITIONAL_DESTROY:
5664                 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5665                 RETURN(rc);
5666         case LE_PAIRS_VERIFY: {
5667                 lr->lr_status = LPVS_INIT;
5668                 /* Firstly, if the MDT-object which is claimed via OST-object
5669                  * local stored PFID xattr recognizes the OST-object, then it
5670                  * must be that the client given PFID is wrong. */
5671                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5672                                                     &lr->lr_fid3);
5673                 if (rc <= 0)
5674                         RETURN(0);
5675
5676                 lr->lr_status = LPVS_INCONSISTENT;
5677                 /* The OST-object local stored PFID xattr is stale. We need to
5678                  * check whether the MDT-object that is claimed via the client
5679                  * given PFID information recognizes the OST-object or not. If
5680                  * matches, then need to update the OST-object's PFID xattr. */
5681                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5682                                                     &lr->lr_fid2);
5683                 /* For rc < 0 case:
5684                  * We are not sure whether the client given PFID information
5685                  * is correct or not, do nothing to avoid improper fixing.
5686                  *
5687                  * For rc > 0 case:
5688                  * The client given PFID information is also invalid, we can
5689                  * NOT fix the OST-object inconsistency.
5690                  */
5691                 if (rc != 0)
5692                         RETURN(rc);
5693
5694                 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5695                 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5696
5697                 RETURN(rc);
5698         }
5699         case LE_PHASE2_DONE:
5700         case LE_PEER_EXIT:
5701                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave handle notify %u "
5702                        "from MDT %x, status %d\n", lfsck_lfsck2name(lfsck),
5703                        lr->lr_event, lr->lr_index, lr->lr_status);
5704                 break;
5705         default:
5706                 RETURN(-EINVAL);
5707         }
5708
5709         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5710         if (llst == NULL)
5711                 RETURN(-ENXIO);
5712
5713         lfsck_layout_llst_put(llst);
5714         if (list_empty(&llsd->llsd_master_list))
5715                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5716
5717         if (lr->lr_event == LE_PEER_EXIT &&
5718             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5719                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5720
5721                 memset(stop, 0, sizeof(*stop));
5722                 stop->ls_status = lr->lr_status;
5723                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5724                 lfsck_stop(env, lfsck->li_bottom, stop);
5725         }
5726
5727         RETURN(0);
5728 }
5729
5730 static int lfsck_layout_query(const struct lu_env *env,
5731                               struct lfsck_component *com)
5732 {
5733         struct lfsck_layout *lo = com->lc_file_ram;
5734
5735         return lo->ll_status;
5736 }
5737
5738 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
5739                                            struct lfsck_component *com,
5740                                            struct lfsck_tgt_descs *ltds,
5741                                            struct lfsck_tgt_desc *ltd,
5742                                            struct ptlrpc_request_set *set)
5743 {
5744         struct lfsck_thread_info          *info  = lfsck_env_info(env);
5745         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
5746         struct lfsck_request              *lr    = &info->lti_lr;
5747         struct lfsck_instance             *lfsck = com->lc_lfsck;
5748         int                                rc;
5749
5750         spin_lock(&ltds->ltd_lock);
5751         if (list_empty(&ltd->ltd_layout_list)) {
5752                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
5753                 spin_unlock(&ltds->ltd_lock);
5754
5755                 return 0;
5756         }
5757
5758         list_del_init(&ltd->ltd_layout_phase_list);
5759         list_del_init(&ltd->ltd_layout_list);
5760         spin_unlock(&ltds->ltd_lock);
5761
5762         memset(lr, 0, sizeof(*lr));
5763         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
5764         lr->lr_event = LE_PEER_EXIT;
5765         lr->lr_active = LFSCK_TYPE_LAYOUT;
5766         lr->lr_status = LS_CO_PAUSED;
5767         if (ltds == &lfsck->li_ost_descs)
5768                 lr->lr_flags = LEF_TO_OST;
5769
5770         laia->laia_com = com;
5771         laia->laia_ltds = ltds;
5772         atomic_inc(&ltd->ltd_ref);
5773         laia->laia_ltd = ltd;
5774         laia->laia_lr = lr;
5775         laia->laia_shared = 0;
5776
5777         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
5778                                  lfsck_layout_master_async_interpret,
5779                                  laia, LFSCK_NOTIFY);
5780         if (rc != 0) {
5781                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to notify %s %x "
5782                        "for co-stop: rc = %d\n",
5783                        lfsck_lfsck2name(lfsck),
5784                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5785                        ltd->ltd_index, rc);
5786                 lfsck_tgt_put(ltd);
5787         }
5788
5789         return rc;
5790 }
5791
5792 /* with lfsck::li_lock held */
5793 static int lfsck_layout_slave_join(const struct lu_env *env,
5794                                    struct lfsck_component *com,
5795                                    struct lfsck_start_param *lsp)
5796 {
5797         struct lfsck_instance            *lfsck = com->lc_lfsck;
5798         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5799         struct lfsck_layout_slave_target *llst;
5800         struct lfsck_start               *start = lsp->lsp_start;
5801         int                               rc    = 0;
5802         ENTRY;
5803
5804         if (start == NULL || !(start->ls_flags & LPF_ORPHAN))
5805                 RETURN(0);
5806
5807         if (!lsp->lsp_index_valid)
5808                 RETURN(-EINVAL);
5809
5810         /* If someone is running the LFSCK without orphan handling,
5811          * it will not maintain the object accessing rbtree. So we
5812          * cannot join it for orphan handling. */
5813         if (!llsd->llsd_rbtree_valid)
5814                 RETURN(-EBUSY);
5815
5816         spin_unlock(&lfsck->li_lock);
5817         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5818         spin_lock(&lfsck->li_lock);
5819         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5820                 spin_unlock(&lfsck->li_lock);
5821                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5822                                                       true);
5823                 if (llst != NULL)
5824                         lfsck_layout_llst_put(llst);
5825                 spin_lock(&lfsck->li_lock);
5826                 rc = -EAGAIN;
5827         }
5828
5829         RETURN(rc);
5830 }
5831
5832 static struct lfsck_operations lfsck_layout_master_ops = {
5833         .lfsck_reset            = lfsck_layout_reset,
5834         .lfsck_fail             = lfsck_layout_fail,
5835         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
5836         .lfsck_prep             = lfsck_layout_master_prep,
5837         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
5838         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5839         .lfsck_post             = lfsck_layout_master_post,
5840         .lfsck_interpret        = lfsck_layout_master_async_interpret,
5841         .lfsck_dump             = lfsck_layout_dump,
5842         .lfsck_double_scan      = lfsck_layout_master_double_scan,
5843         .lfsck_data_release     = lfsck_layout_master_data_release,
5844         .lfsck_quit             = lfsck_layout_master_quit,
5845         .lfsck_in_notify        = lfsck_layout_master_in_notify,
5846         .lfsck_query            = lfsck_layout_query,
5847         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
5848 };
5849
5850 static struct lfsck_operations lfsck_layout_slave_ops = {
5851         .lfsck_reset            = lfsck_layout_reset,
5852         .lfsck_fail             = lfsck_layout_fail,
5853         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
5854         .lfsck_prep             = lfsck_layout_slave_prep,
5855         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
5856         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5857         .lfsck_post             = lfsck_layout_slave_post,
5858         .lfsck_dump             = lfsck_layout_dump,
5859         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
5860         .lfsck_data_release     = lfsck_layout_slave_data_release,
5861         .lfsck_quit             = lfsck_layout_slave_quit,
5862         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
5863         .lfsck_query            = lfsck_layout_query,
5864         .lfsck_join             = lfsck_layout_slave_join,
5865 };
5866
5867 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5868 {
5869         struct lfsck_component  *com;
5870         struct lfsck_layout     *lo;
5871         struct dt_object        *root = NULL;
5872         struct dt_object        *obj;
5873         int                      rc;
5874         ENTRY;
5875
5876         OBD_ALLOC_PTR(com);
5877         if (com == NULL)
5878                 RETURN(-ENOMEM);
5879
5880         INIT_LIST_HEAD(&com->lc_link);
5881         INIT_LIST_HEAD(&com->lc_link_dir);
5882         init_rwsem(&com->lc_sem);
5883         atomic_set(&com->lc_ref, 1);
5884         com->lc_lfsck = lfsck;
5885         com->lc_type = LFSCK_TYPE_LAYOUT;
5886         if (lfsck->li_master) {
5887                 struct lfsck_layout_master_data *llmd;
5888
5889                 com->lc_ops = &lfsck_layout_master_ops;
5890                 OBD_ALLOC_PTR(llmd);
5891                 if (llmd == NULL)
5892                         GOTO(out, rc = -ENOMEM);
5893
5894                 INIT_LIST_HEAD(&llmd->llmd_req_list);
5895                 spin_lock_init(&llmd->llmd_lock);
5896                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
5897                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
5898                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
5899                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
5900                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
5901                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
5902                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
5903                 com->lc_data = llmd;
5904         } else {
5905                 struct lfsck_layout_slave_data *llsd;
5906
5907                 com->lc_ops = &lfsck_layout_slave_ops;
5908                 OBD_ALLOC_PTR(llsd);
5909                 if (llsd == NULL)
5910                         GOTO(out, rc = -ENOMEM);
5911
5912                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5913                 INIT_LIST_HEAD(&llsd->llsd_master_list);
5914                 spin_lock_init(&llsd->llsd_lock);
5915                 llsd->llsd_rb_root = RB_ROOT;
5916                 rwlock_init(&llsd->llsd_rb_lock);
5917                 com->lc_data = llsd;
5918         }
5919         com->lc_file_size = sizeof(*lo);
5920         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5921         if (com->lc_file_ram == NULL)
5922                 GOTO(out, rc = -ENOMEM);
5923
5924         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5925         if (com->lc_file_disk == NULL)
5926                 GOTO(out, rc = -ENOMEM);
5927
5928         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5929         if (IS_ERR(root))
5930                 GOTO(out, rc = PTR_ERR(root));
5931
5932         if (unlikely(!dt_try_as_dir(env, root)))
5933                 GOTO(out, rc = -ENOTDIR);
5934
5935         obj = local_file_find_or_create(env, lfsck->li_los, root,
5936                                         lfsck_layout_name,
5937                                         S_IFREG | S_IRUGO | S_IWUSR);
5938         if (IS_ERR(obj))
5939                 GOTO(out, rc = PTR_ERR(obj));
5940
5941         com->lc_obj = obj;
5942         rc = lfsck_layout_load(env, com);
5943         if (rc > 0)
5944                 rc = lfsck_layout_reset(env, com, true);
5945         else if (rc == -ENOENT)
5946                 rc = lfsck_layout_init(env, com);
5947
5948         if (rc != 0)
5949                 GOTO(out, rc);
5950
5951         lo = com->lc_file_ram;
5952         switch (lo->ll_status) {
5953         case LS_INIT:
5954         case LS_COMPLETED:
5955         case LS_FAILED:
5956         case LS_STOPPED:
5957         case LS_PARTIAL:
5958                 spin_lock(&lfsck->li_lock);
5959                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5960                 spin_unlock(&lfsck->li_lock);
5961                 break;
5962         default:
5963                 CERROR("%s: unknown lfsck_layout status %d\n",
5964                        lfsck_lfsck2name(lfsck), lo->ll_status);
5965                 /* fall through */
5966         case LS_SCANNING_PHASE1:
5967         case LS_SCANNING_PHASE2:
5968                 /* No need to store the status to disk right now.
5969                  * If the system crashed before the status stored,
5970                  * it will be loaded back when next time. */
5971                 lo->ll_status = LS_CRASHED;
5972                 lo->ll_flags |= LF_INCOMPLETE;
5973                 /* fall through */
5974         case LS_PAUSED:
5975         case LS_CRASHED:
5976         case LS_CO_FAILED:
5977         case LS_CO_STOPPED:
5978         case LS_CO_PAUSED:
5979                 spin_lock(&lfsck->li_lock);
5980                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5981                 spin_unlock(&lfsck->li_lock);
5982                 break;
5983         }
5984
5985         if (lo->ll_flags & LF_CRASHED_LASTID) {
5986                 LASSERT(lfsck->li_out_notify != NULL);
5987
5988                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5989                                      LE_LASTID_REBUILDING);
5990         }
5991
5992         GOTO(out, rc = 0);
5993
5994 out:
5995         if (root != NULL && !IS_ERR(root))
5996                 lu_object_put(env, &root->do_lu);
5997
5998         if (rc != 0) {
5999                 lfsck_component_cleanup(env, com);
6000                 CERROR("%s: fail to init layout LFSCK component: rc = %d\n",
6001                        lfsck_lfsck2name(lfsck), rc);
6002         }
6003
6004         return rc;
6005 }
6006
6007 struct lfsck_orphan_it {
6008         struct lfsck_component           *loi_com;
6009         struct lfsck_rbtree_node         *loi_lrn;
6010         struct lfsck_layout_slave_target *loi_llst;
6011         struct lu_fid                     loi_key;
6012         struct lu_orphan_rec              loi_rec;
6013         __u64                             loi_hash;
6014         unsigned int                      loi_over:1;
6015 };
6016
6017 static int lfsck_fid_match_idx(const struct lu_env *env,
6018                                struct lfsck_instance *lfsck,
6019                                const struct lu_fid *fid, int idx)
6020 {
6021         struct seq_server_site  *ss;
6022         struct lu_server_fld    *sf;
6023         struct lu_seq_range      range  = { 0 };
6024         int                      rc;
6025
6026         /* All abnormal cases will be returned to MDT0. */
6027         if (!fid_is_norm(fid)) {
6028                 if (idx == 0)
6029                         return 1;
6030
6031                 return 0;
6032         }
6033
6034         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
6035         if (unlikely(ss == NULL))
6036                 return -ENOTCONN;
6037
6038         sf = ss->ss_server_fld;
6039         LASSERT(sf != NULL);
6040
6041         fld_range_set_any(&range);
6042         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
6043         if (rc != 0)
6044                 return rc;
6045
6046         if (!fld_range_is_mdt(&range))
6047                 return -EINVAL;
6048
6049         if (range.lsr_index == idx)
6050                 return 1;
6051
6052         return 0;
6053 }
6054
6055 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
6056                                         struct dt_device *dev,
6057                                         struct dt_object *obj)
6058 {
6059         struct thandle *handle;
6060         int             rc;
6061         ENTRY;
6062
6063         handle = dt_trans_create(env, dev);
6064         if (IS_ERR(handle))
6065                 RETURN_EXIT;
6066
6067         rc = dt_declare_ref_del(env, obj, handle);
6068         if (rc != 0)
6069                 GOTO(stop, rc);
6070
6071         rc = dt_declare_destroy(env, obj, handle);
6072         if (rc != 0)
6073                 GOTO(stop, rc);
6074
6075         rc = dt_trans_start_local(env, dev, handle);
6076         if (rc != 0)
6077                 GOTO(stop, rc);
6078
6079         dt_write_lock(env, obj, 0);
6080         rc = dt_ref_del(env, obj, handle);
6081         if (rc == 0)
6082                 rc = dt_destroy(env, obj, handle);
6083         dt_write_unlock(env, obj);
6084
6085         GOTO(stop, rc);
6086
6087 stop:
6088         dt_trans_stop(env, dev, handle);
6089
6090         CDEBUG(D_LFSCK, "destroy orphan OST-object "DFID": rc = %d\n",
6091                PFID(lfsck_dto2fid(obj)), rc);
6092
6093         RETURN_EXIT;
6094 }
6095
6096 static int lfsck_orphan_index_lookup(const struct lu_env *env,
6097                                      struct dt_object *dt,
6098                                      struct dt_rec *rec,
6099                                      const struct dt_key *key,
6100                                      struct lustre_capa *capa)
6101 {
6102         return -EOPNOTSUPP;
6103 }
6104
6105 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
6106                                              struct dt_object *dt,
6107                                              const struct dt_rec *rec,
6108                                              const struct dt_key *key,
6109                                              struct thandle *handle)
6110 {
6111         return -EOPNOTSUPP;
6112 }
6113
6114 static int lfsck_orphan_index_insert(const struct lu_env *env,
6115                                      struct dt_object *dt,
6116                                      const struct dt_rec *rec,
6117                                      const struct dt_key *key,
6118                                      struct thandle *handle,
6119                                      struct lustre_capa *capa,
6120                                      int ignore_quota)
6121 {
6122         return -EOPNOTSUPP;
6123 }
6124
6125 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
6126                                              struct dt_object *dt,
6127                                              const struct dt_key *key,
6128                                              struct thandle *handle)
6129 {
6130         return -EOPNOTSUPP;
6131 }
6132
6133 static int lfsck_orphan_index_delete(const struct lu_env *env,
6134                                      struct dt_object *dt,
6135                                      const struct dt_key *key,
6136                                      struct thandle *handle,
6137                                      struct lustre_capa *capa)
6138 {
6139         return -EOPNOTSUPP;
6140 }
6141
6142 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
6143                                           struct dt_object *dt,
6144                                           __u32 attr,
6145                                           struct lustre_capa *capa)
6146 {
6147         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
6148         struct lfsck_instance           *lfsck;
6149         struct lfsck_component          *com    = NULL;
6150         struct lfsck_layout_slave_data  *llsd;
6151         struct lfsck_orphan_it          *it     = NULL;
6152         int                              rc     = 0;
6153         ENTRY;
6154
6155         lfsck = lfsck_instance_find(dev, true, false);
6156         if (unlikely(lfsck == NULL))
6157                 RETURN(ERR_PTR(-ENXIO));
6158
6159         com = lfsck_component_find(lfsck, LFSCK_TYPE_LAYOUT);
6160         if (unlikely(com == NULL))
6161                 GOTO(out, rc = -ENOENT);
6162
6163         llsd = com->lc_data;
6164         if (!llsd->llsd_rbtree_valid)
6165                 GOTO(out, rc = -ESRCH);
6166
6167         OBD_ALLOC_PTR(it);
6168         if (it == NULL)
6169                 GOTO(out, rc = -ENOMEM);
6170
6171         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
6172         if (it->loi_llst == NULL)
6173                 GOTO(out, rc = -ENXIO);
6174
6175         if (dev->dd_record_fid_accessed) {
6176                 /* The first iteration against the rbtree, scan the whole rbtree
6177                  * to remove the nodes which do NOT need to be handled. */
6178                 write_lock(&llsd->llsd_rb_lock);
6179                 if (dev->dd_record_fid_accessed) {
6180                         struct rb_node                  *node;
6181                         struct rb_node                  *next;
6182                         struct lfsck_rbtree_node        *lrn;
6183
6184                         /* No need to record the fid accessing anymore. */
6185                         dev->dd_record_fid_accessed = 0;
6186
6187                         node = rb_first(&llsd->llsd_rb_root);
6188                         while (node != NULL) {
6189                                 next = rb_next(node);
6190                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
6191                                                lrn_node);
6192                                 if (atomic_read(&lrn->lrn_known_count) <=
6193                                     atomic_read(&lrn->lrn_accessed_count)) {
6194                                         rb_erase(node, &llsd->llsd_rb_root);
6195                                         lfsck_rbtree_free(lrn);
6196                                 }
6197                                 node = next;
6198                         }
6199                 }
6200                 write_unlock(&llsd->llsd_rb_lock);
6201         }
6202
6203         /* read lock the rbtree when init, and unlock when fini */
6204         read_lock(&llsd->llsd_rb_lock);
6205         it->loi_com = com;
6206         com = NULL;
6207
6208         GOTO(out, rc = 0);
6209
6210 out:
6211         if (com != NULL)
6212                 lfsck_component_put(env, com);
6213
6214         CDEBUG(D_LFSCK, "%s: init the orphan iteration: rc = %d\n",
6215                lfsck_lfsck2name(lfsck), rc);
6216
6217         lfsck_instance_put(env, lfsck);
6218         if (rc != 0) {
6219                 if (it != NULL)
6220                         OBD_FREE_PTR(it);
6221
6222                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
6223         }
6224
6225         return (struct dt_it *)it;
6226 }
6227
6228 static void lfsck_orphan_it_fini(const struct lu_env *env,
6229                                  struct dt_it *di)
6230 {
6231         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
6232         struct lfsck_component           *com   = it->loi_com;
6233         struct lfsck_layout_slave_data   *llsd;
6234         struct lfsck_layout_slave_target *llst;
6235
6236         if (com != NULL) {
6237                 CDEBUG(D_LFSCK, "%s: fini the orphan iteration\n",
6238                        lfsck_lfsck2name(com->lc_lfsck));
6239
6240                 llsd = com->lc_data;
6241                 read_unlock(&llsd->llsd_rb_lock);
6242                 llst = it->loi_llst;
6243                 LASSERT(llst != NULL);
6244
6245                 /* Save the key and hash for iterate next. */
6246                 llst->llst_fid = it->loi_key;
6247                 llst->llst_hash = it->loi_hash;
6248                 lfsck_layout_llst_put(llst);
6249                 lfsck_component_put(env, com);
6250         }
6251         OBD_FREE_PTR(it);
6252 }
6253
6254 /**
6255  * \retval       +1: the iteration finished
6256  * \retval        0: on success, not finished
6257  * \retval      -ve: on error
6258  */
6259 static int lfsck_orphan_it_next(const struct lu_env *env,
6260                                 struct dt_it *di)
6261 {
6262         struct lfsck_thread_info        *info   = lfsck_env_info(env);
6263         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
6264         struct lu_attr                  *la     = &info->lti_la;
6265         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
6266         struct lu_fid                   *key    = &it->loi_key;
6267         struct lu_orphan_rec            *rec    = &it->loi_rec;
6268         struct lfsck_component          *com    = it->loi_com;
6269         struct lfsck_instance           *lfsck  = com->lc_lfsck;
6270         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
6271         struct dt_object                *obj;
6272         struct lfsck_rbtree_node        *lrn;
6273         int                              pos;
6274         int                              rc;
6275         __u32                            save;
6276         __u32                            idx    = it->loi_llst->llst_index;
6277         bool                             exact  = false;
6278         ENTRY;
6279
6280         if (it->loi_over)
6281                 RETURN(1);
6282
6283 again0:
6284         lrn = it->loi_lrn;
6285         if (lrn == NULL) {
6286                 lrn = lfsck_rbtree_search(llsd, key, &exact);
6287                 if (lrn == NULL) {
6288                         it->loi_over = 1;
6289                         RETURN(1);
6290                 }
6291
6292                 it->loi_lrn = lrn;
6293                 if (!exact) {
6294                         key->f_seq = lrn->lrn_seq;
6295                         key->f_oid = lrn->lrn_first_oid;
6296                         key->f_ver = 0;
6297                 }
6298         } else {
6299                 key->f_oid++;
6300                 if (unlikely(key->f_oid == 0)) {
6301                         key->f_seq++;
6302                         it->loi_lrn = NULL;
6303                         goto again0;
6304                 }
6305
6306                 if (key->f_oid >=
6307                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
6308                         it->loi_lrn = NULL;
6309                         goto again0;
6310                 }
6311         }
6312
6313         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
6314                      atomic_read(&lrn->lrn_accessed_count))) {
6315                 struct rb_node *next = rb_next(&lrn->lrn_node);
6316
6317                 while (next != NULL) {
6318                         lrn = rb_entry(next, struct lfsck_rbtree_node,
6319                                        lrn_node);
6320                         if (atomic_read(&lrn->lrn_known_count) >
6321                             atomic_read(&lrn->lrn_accessed_count))
6322                                 break;
6323                         next = rb_next(next);
6324                 }
6325
6326                 if (next == NULL) {
6327                         it->loi_over = 1;
6328                         RETURN(1);
6329                 }
6330
6331                 it->loi_lrn = lrn;
6332                 key->f_seq = lrn->lrn_seq;
6333                 key->f_oid = lrn->lrn_first_oid;
6334                 key->f_ver = 0;
6335         }
6336
6337         pos = key->f_oid - lrn->lrn_first_oid;
6338
6339 again1:
6340         pos = find_next_bit(lrn->lrn_known_bitmap,
6341                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
6342         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
6343                 key->f_oid = lrn->lrn_first_oid + pos;
6344                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
6345                         key->f_seq++;
6346                         key->f_oid = 0;
6347                 }
6348                 it->loi_lrn = NULL;
6349                 goto again0;
6350         }
6351
6352         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
6353                 pos++;
6354                 goto again1;
6355         }
6356
6357         key->f_oid = lrn->lrn_first_oid + pos;
6358         obj = lfsck_object_find(env, lfsck, key);
6359         if (IS_ERR(obj)) {
6360                 rc = PTR_ERR(obj);
6361                 if (rc == -ENOENT) {
6362                         pos++;
6363                         goto again1;
6364                 }
6365                 RETURN(rc);
6366         }
6367
6368         dt_read_lock(env, obj, 0);
6369         if (dt_object_exists(obj) == 0 ||
6370             lfsck_is_dead_obj(obj)) {
6371                 dt_read_unlock(env, obj);
6372                 lfsck_object_put(env, obj);
6373                 pos++;
6374                 goto again1;
6375         }
6376
6377         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
6378         if (rc != 0)
6379                 GOTO(out, rc);
6380
6381         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
6382                           XATTR_NAME_FID, BYPASS_CAPA);
6383         if (rc == -ENODATA) {
6384                 /* For the pre-created OST-object, update the bitmap to avoid
6385                  * others LFSCK (second phase) iteration to touch it again. */
6386                 if (la->la_ctime == 0) {
6387                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
6388                                 atomic_inc(&lrn->lrn_accessed_count);
6389
6390                         /* For the race between repairing dangling referenced
6391                          * MDT-object and unlink the file, it may left orphan
6392                          * OST-object there. Destroy it now! */
6393                         if (unlikely(!(la->la_mode & S_ISUID))) {
6394                                 dt_read_unlock(env, obj);
6395                                 lfsck_layout_destroy_orphan(env,
6396                                                             lfsck->li_bottom,
6397                                                             obj);
6398                                 lfsck_object_put(env, obj);
6399                                 pos++;
6400                                 goto again1;
6401                         }
6402                 } else if (idx == 0) {
6403                         /* If the orphan OST-object has no parent information,
6404                          * regard it as referenced by the MDT-object on MDT0. */
6405                         fid_zero(&rec->lor_fid);
6406                         rec->lor_uid = la->la_uid;
6407                         rec->lor_gid = la->la_gid;
6408                         GOTO(out, rc = 0);
6409                 }
6410
6411                 dt_read_unlock(env, obj);
6412                 lfsck_object_put(env, obj);
6413                 pos++;
6414                 goto again1;
6415         }
6416
6417         if (rc < 0)
6418                 GOTO(out, rc);
6419
6420         if (rc != sizeof(struct filter_fid) &&
6421             rc != sizeof(struct filter_fid_old))
6422                 GOTO(out, rc = -EINVAL);
6423
6424         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6425         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6426          * MDT-object's FID::f_ver, instead it is the OST-object index in its
6427          * parent MDT-object's layout EA. */
6428         save = rec->lor_fid.f_stripe_idx;
6429         rec->lor_fid.f_ver = 0;
6430         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6431         /* If the orphan OST-object does not claim the MDT, then next.
6432          *
6433          * If we do not know whether it matches or not, then return it
6434          * to the MDT for further check. */
6435         if (rc == 0) {
6436                 dt_read_unlock(env, obj);
6437                 lfsck_object_put(env, obj);
6438                 pos++;
6439                 goto again1;
6440         }
6441
6442         rec->lor_fid.f_stripe_idx = save;
6443         rec->lor_uid = la->la_uid;
6444         rec->lor_gid = la->la_gid;
6445
6446         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6447                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6448                rec->lor_uid, rec->lor_gid);
6449
6450         GOTO(out, rc = 0);
6451
6452 out:
6453         dt_read_unlock(env, obj);
6454         lfsck_object_put(env, obj);
6455         if (rc == 0)
6456                 it->loi_hash++;
6457
6458         return rc;
6459 }
6460
6461 /**
6462  * \retval       +1: locate to the exactly position
6463  * \retval        0: cannot locate to the exactly position,
6464  *                   call next() to move to a valid position.
6465  * \retval      -ve: on error
6466  */
6467 static int lfsck_orphan_it_get(const struct lu_env *env,
6468                                struct dt_it *di,
6469                                const struct dt_key *key)
6470 {
6471         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6472         int                      rc;
6473
6474         it->loi_key = *(struct lu_fid *)key;
6475         rc = lfsck_orphan_it_next(env, di);
6476         if (rc == 1)
6477                 return 0;
6478
6479         if (rc == 0)
6480                 return 1;
6481
6482         return rc;
6483 }
6484
6485 static void lfsck_orphan_it_put(const struct lu_env *env,
6486                                 struct dt_it *di)
6487 {
6488 }
6489
6490 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6491                                           const struct dt_it *di)
6492 {
6493         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6494
6495         return (struct dt_key *)&it->loi_key;
6496 }
6497
6498 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6499                                     const struct dt_it *di)
6500 {
6501         return sizeof(struct lu_fid);
6502 }
6503
6504 static int lfsck_orphan_it_rec(const struct lu_env *env,
6505                                const struct dt_it *di,
6506                                struct dt_rec *rec,
6507                                __u32 attr)
6508 {
6509         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6510
6511         *(struct lu_orphan_rec *)rec = it->loi_rec;
6512
6513         return 0;
6514 }
6515
6516 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6517                                    const struct dt_it *di)
6518 {
6519         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6520
6521         return it->loi_hash;
6522 }
6523
6524 /**
6525  * \retval       +1: locate to the exactly position
6526  * \retval        0: cannot locate to the exactly position,
6527  *                   call next() to move to a valid position.
6528  * \retval      -ve: on error
6529  */
6530 static int lfsck_orphan_it_load(const struct lu_env *env,
6531                                 const struct dt_it *di,
6532                                 __u64 hash)
6533 {
6534         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
6535         struct lfsck_layout_slave_target *llst = it->loi_llst;
6536         int                               rc;
6537
6538         LASSERT(llst != NULL);
6539
6540         if (hash != llst->llst_hash) {
6541                 CDEBUG(D_LFSCK, "%s: the given hash "LPU64" for orphan "
6542                        "iteration does not match the one when fini "
6543                        LPU64", to be reset.\n",
6544                        lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6545                        llst->llst_hash);
6546                 fid_zero(&llst->llst_fid);
6547                 llst->llst_hash = 0;
6548         }
6549
6550         it->loi_key = llst->llst_fid;
6551         it->loi_hash = llst->llst_hash;
6552         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6553         if (rc == 1)
6554                 return 0;
6555
6556         if (rc == 0)
6557                 return 1;
6558
6559         return rc;
6560 }
6561
6562 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6563                                    const struct dt_it *di,
6564                                    void *key_rec)
6565 {
6566         return 0;
6567 }
6568
6569 const struct dt_index_operations lfsck_orphan_index_ops = {
6570         .dio_lookup             = lfsck_orphan_index_lookup,
6571         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
6572         .dio_insert             = lfsck_orphan_index_insert,
6573         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
6574         .dio_delete             = lfsck_orphan_index_delete,
6575         .dio_it = {
6576                 .init           = lfsck_orphan_it_init,
6577                 .fini           = lfsck_orphan_it_fini,
6578                 .get            = lfsck_orphan_it_get,
6579                 .put            = lfsck_orphan_it_put,
6580                 .next           = lfsck_orphan_it_next,
6581                 .key            = lfsck_orphan_it_key,
6582                 .key_size       = lfsck_orphan_it_key_size,
6583                 .rec            = lfsck_orphan_it_rec,
6584                 .store          = lfsck_orphan_it_store,
6585                 .load           = lfsck_orphan_it_load,
6586                 .key_rec        = lfsck_orphan_it_key_rec,
6587         }
6588 };