Whamcloud - gitweb
3db63dc87f627aab09a5c647997811bfe1c39f88
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         /* The position for next record in the rbtree for iteration. */
68         struct lu_fid           llst_fid;
69         /* Dummy hash for iteration against the rbtree. */
70         __u64                   llst_hash;
71         __u64                   llst_gen;
72         atomic_t                llst_ref;
73         __u32                   llst_index;
74 };
75
76 struct lfsck_layout_slave_data {
77         /* list for lfsck_layout_seq */
78         struct list_head         llsd_seq_list;
79
80         /* list for the masters involve layout verification. */
81         struct list_head         llsd_master_list;
82         spinlock_t               llsd_lock;
83         __u64                    llsd_touch_gen;
84         struct dt_object        *llsd_rb_obj;
85         struct rb_root           llsd_rb_root;
86         rwlock_t                 llsd_rb_lock;
87         unsigned int             llsd_rbtree_valid:1;
88 };
89
90 struct lfsck_layout_object {
91         struct dt_object        *llo_obj;
92         struct lu_attr           llo_attr;
93         atomic_t                 llo_ref;
94         __u16                    llo_gen;
95 };
96
97 struct lfsck_layout_req {
98         struct list_head                 llr_list;
99         struct lfsck_layout_object      *llr_parent;
100         struct dt_object                *llr_child;
101         __u32                            llr_ost_idx;
102         __u32                            llr_lov_idx; /* offset in LOV EA */
103 };
104
105 struct lfsck_layout_master_data {
106         spinlock_t              llmd_lock;
107         struct list_head        llmd_req_list;
108
109         /* list for the ost targets involve layout verification. */
110         struct list_head        llmd_ost_list;
111
112         /* list for the ost targets in phase1 scanning. */
113         struct list_head        llmd_ost_phase1_list;
114
115         /* list for the ost targets in phase1 scanning. */
116         struct list_head        llmd_ost_phase2_list;
117
118         /* list for the mdt targets involve layout verification. */
119         struct list_head        llmd_mdt_list;
120
121         /* list for the mdt targets in phase1 scanning. */
122         struct list_head        llmd_mdt_phase1_list;
123
124         /* list for the mdt targets in phase1 scanning. */
125         struct list_head        llmd_mdt_phase2_list;
126
127         struct ptlrpc_thread    llmd_thread;
128         __u32                   llmd_touch_gen;
129         int                     llmd_prefetched;
130         int                     llmd_assistant_status;
131         int                     llmd_post_result;
132         unsigned int            llmd_to_post:1,
133                                 llmd_to_double_scan:1,
134                                 llmd_in_double_scan:1,
135                                 llmd_exit:1;
136 };
137
138 struct lfsck_layout_slave_async_args {
139         struct obd_export                *llsaa_exp;
140         struct lfsck_component           *llsaa_com;
141         struct lfsck_layout_slave_target *llsaa_llst;
142 };
143
144 static struct lfsck_layout_object *
145 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
146                          __u16 gen)
147 {
148         struct lfsck_layout_object *llo;
149         int                         rc;
150
151         OBD_ALLOC_PTR(llo);
152         if (llo == NULL)
153                 return ERR_PTR(-ENOMEM);
154
155         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
156         if (rc != 0) {
157                 OBD_FREE_PTR(llo);
158
159                 return ERR_PTR(rc);
160         }
161
162         lu_object_get(&obj->do_lu);
163         llo->llo_obj = obj;
164         /* The gen can be used to check whether some others have changed the
165          * file layout after LFSCK pre-fetching but before real verification. */
166         llo->llo_gen = gen;
167         atomic_set(&llo->llo_ref, 1);
168
169         return llo;
170 }
171
172 static inline void
173 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
174 {
175         if (atomic_dec_and_test(&llst->llst_ref)) {
176                 LASSERT(list_empty(&llst->llst_list));
177
178                 OBD_FREE_PTR(llst);
179         }
180 }
181
182 static inline int
183 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
184 {
185         struct lfsck_layout_slave_target *llst;
186         struct lfsck_layout_slave_target *tmp;
187         int                               rc   = 0;
188
189         OBD_ALLOC_PTR(llst);
190         if (llst == NULL)
191                 return -ENOMEM;
192
193         INIT_LIST_HEAD(&llst->llst_list);
194         llst->llst_gen = 0;
195         llst->llst_index = index;
196         atomic_set(&llst->llst_ref, 1);
197
198         spin_lock(&llsd->llsd_lock);
199         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
200                 if (tmp->llst_index == index) {
201                         rc = -EALREADY;
202                         break;
203                 }
204         }
205         if (rc == 0)
206                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
207         spin_unlock(&llsd->llsd_lock);
208
209         if (rc != 0)
210                 OBD_FREE_PTR(llst);
211
212         return rc;
213 }
214
215 static inline void
216 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
217                       struct lfsck_layout_slave_target *llst)
218 {
219         bool del = false;
220
221         spin_lock(&llsd->llsd_lock);
222         if (!list_empty(&llst->llst_list)) {
223                 list_del_init(&llst->llst_list);
224                 del = true;
225         }
226         spin_unlock(&llsd->llsd_lock);
227
228         if (del)
229                 lfsck_layout_llst_put(llst);
230 }
231
232 static inline struct lfsck_layout_slave_target *
233 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
234                                __u32 index, bool unlink)
235 {
236         struct lfsck_layout_slave_target *llst;
237
238         spin_lock(&llsd->llsd_lock);
239         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
240                 if (llst->llst_index == index) {
241                         if (unlink)
242                                 list_del_init(&llst->llst_list);
243                         else
244                                 atomic_inc(&llst->llst_ref);
245                         spin_unlock(&llsd->llsd_lock);
246
247                         return llst;
248                 }
249         }
250         spin_unlock(&llsd->llsd_lock);
251
252         return NULL;
253 }
254
255 static inline void lfsck_layout_object_put(const struct lu_env *env,
256                                            struct lfsck_layout_object *llo)
257 {
258         if (atomic_dec_and_test(&llo->llo_ref)) {
259                 lfsck_object_put(env, llo->llo_obj);
260                 OBD_FREE_PTR(llo);
261         }
262 }
263
264 static struct lfsck_layout_req *
265 lfsck_layout_req_init(struct lfsck_layout_object *parent,
266                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
267 {
268         struct lfsck_layout_req *llr;
269
270         OBD_ALLOC_PTR(llr);
271         if (llr == NULL)
272                 return ERR_PTR(-ENOMEM);
273
274         INIT_LIST_HEAD(&llr->llr_list);
275         atomic_inc(&parent->llo_ref);
276         llr->llr_parent = parent;
277         llr->llr_child = child;
278         llr->llr_ost_idx = ost_idx;
279         llr->llr_lov_idx = lov_idx;
280
281         return llr;
282 }
283
284 static inline void lfsck_layout_req_fini(const struct lu_env *env,
285                                          struct lfsck_layout_req *llr)
286 {
287         lu_object_put(env, &llr->llr_child->do_lu);
288         lfsck_layout_object_put(env, llr->llr_parent);
289         OBD_FREE_PTR(llr);
290 }
291
292 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
293 {
294         bool empty = false;
295
296         spin_lock(&llmd->llmd_lock);
297         if (list_empty(&llmd->llmd_req_list))
298                 empty = true;
299         spin_unlock(&llmd->llmd_lock);
300
301         return empty;
302 }
303
304 static int lfsck_layout_get_lovea(const struct lu_env *env,
305                                   struct dt_object *obj, struct lu_buf *buf)
306 {
307         int rc;
308
309 again:
310         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
311         if (rc == -ERANGE) {
312                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
313                                   BYPASS_CAPA);
314                 if (rc <= 0)
315                         return rc;
316
317                 lu_buf_realloc(buf, rc);
318                 if (buf->lb_buf == NULL)
319                         return -ENOMEM;
320
321                 goto again;
322         }
323
324         if (rc == -ENODATA)
325                 rc = 0;
326
327         if (rc <= 0)
328                 return rc;
329
330         if (unlikely(buf->lb_buf == NULL)) {
331                 lu_buf_alloc(buf, rc);
332                 if (buf->lb_buf == NULL)
333                         return -ENOMEM;
334
335                 goto again;
336         }
337
338         return rc;
339 }
340
341 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
342 {
343         __u32 magic;
344         __u32 pattern;
345
346         magic = le32_to_cpu(lmm->lmm_magic);
347         /* If magic crashed, keep it there. Sometime later, during OST-object
348          * orphan handling, if some OST-object(s) back-point to it, it can be
349          * verified and repaired. */
350         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
351                 struct ost_id   oi;
352                 int             rc;
353
354                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
355                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
356                         rc = -EOPNOTSUPP;
357                 else
358                         rc = -EINVAL;
359
360                 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
361                        rc == -EINVAL ? "Unknown" : "Unsupported",
362                        magic, POSTID(&oi));
363
364                 return rc;
365         }
366
367         pattern = le32_to_cpu(lmm->lmm_pattern);
368         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
369         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
370                 struct ost_id oi;
371
372                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
373                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
374                        pattern, POSTID(&oi));
375
376                 return -EOPNOTSUPP;
377         }
378
379         return 0;
380 }
381
382 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
383 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
384 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
385
386 struct lfsck_rbtree_node {
387         struct rb_node   lrn_node;
388         __u64            lrn_seq;
389         __u32            lrn_first_oid;
390         atomic_t         lrn_known_count;
391         atomic_t         lrn_accessed_count;
392         void            *lrn_known_bitmap;
393         void            *lrn_accessed_bitmap;
394 };
395
396 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
397                                    __u64 seq, __u32 oid)
398 {
399         if (seq < lrn->lrn_seq)
400                 return -1;
401
402         if (seq > lrn->lrn_seq)
403                 return 1;
404
405         if (oid < lrn->lrn_first_oid)
406                 return -1;
407
408         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
409                 return 1;
410
411         return 0;
412 }
413
414 /* The caller should hold llsd->llsd_rb_lock. */
415 static struct lfsck_rbtree_node *
416 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
417                     const struct lu_fid *fid, bool *exact)
418 {
419         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
420         struct rb_node           *prev  = NULL;
421         struct lfsck_rbtree_node *lrn   = NULL;
422         int                       rc    = 0;
423
424         if (exact != NULL)
425                 *exact = true;
426
427         while (node != NULL) {
428                 prev = node;
429                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
430                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
431                 if (rc < 0)
432                         node = node->rb_left;
433                 else if (rc > 0)
434                         node = node->rb_right;
435                 else
436                         return lrn;
437         }
438
439         if (exact == NULL)
440                 return NULL;
441
442         /* If there is no exactly matched one, then to the next valid one. */
443         *exact = false;
444
445         /* The rbtree is empty. */
446         if (rc == 0)
447                 return NULL;
448
449         if (rc < 0)
450                 return lrn;
451
452         node = rb_next(prev);
453
454         /* The end of the rbtree. */
455         if (node == NULL)
456                 return NULL;
457
458         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
459
460         return lrn;
461 }
462
463 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
464                                                   const struct lu_fid *fid)
465 {
466         struct lfsck_rbtree_node *lrn;
467
468         OBD_ALLOC_PTR(lrn);
469         if (lrn == NULL)
470                 return ERR_PTR(-ENOMEM);
471
472         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
473         if (lrn->lrn_known_bitmap == NULL) {
474                 OBD_FREE_PTR(lrn);
475
476                 return ERR_PTR(-ENOMEM);
477         }
478
479         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
480         if (lrn->lrn_accessed_bitmap == NULL) {
481                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
482                 OBD_FREE_PTR(lrn);
483
484                 return ERR_PTR(-ENOMEM);
485         }
486
487         RB_CLEAR_NODE(&lrn->lrn_node);
488         lrn->lrn_seq = fid_seq(fid);
489         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
490         atomic_set(&lrn->lrn_known_count, 0);
491         atomic_set(&lrn->lrn_accessed_count, 0);
492
493         return lrn;
494 }
495
496 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
497 {
498         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
499         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
500         OBD_FREE_PTR(lrn);
501 }
502
503 /* The caller should hold lock. */
504 static struct lfsck_rbtree_node *
505 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
506                     struct lfsck_rbtree_node *lrn)
507 {
508         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
509         struct rb_node            *parent = NULL;
510         struct lfsck_rbtree_node  *tmp;
511         int                        rc;
512
513         while (*pos != NULL) {
514                 parent = *pos;
515                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
516                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
517                 if (rc < 0)
518                         pos = &(*pos)->rb_left;
519                 else if (rc > 0)
520                         pos = &(*pos)->rb_right;
521                 else
522                         return tmp;
523         }
524
525         rb_link_node(&lrn->lrn_node, parent, pos);
526         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
527
528         return lrn;
529 }
530
531 extern const struct dt_index_operations lfsck_orphan_index_ops;
532
533 static int lfsck_rbtree_setup(const struct lu_env *env,
534                               struct lfsck_component *com)
535 {
536         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
537         struct lfsck_instance           *lfsck  = com->lc_lfsck;
538         struct dt_device                *dev    = lfsck->li_bottom;
539         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
540         struct dt_object                *obj;
541
542         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
543         fid->f_oid = lfsck_dev_idx(dev);
544         fid->f_ver = 0;
545         obj = dt_locate(env, dev, fid);
546         if (IS_ERR(obj))
547                 RETURN(PTR_ERR(obj));
548
549         /* Generate an in-RAM object to stand for the layout rbtree.
550          * Scanning the layout rbtree will be via the iteration over
551          * the object. In the future, the rbtree may be written onto
552          * disk with the object.
553          *
554          * Mark the object to be as exist. */
555         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
556         obj->do_index_ops = &lfsck_orphan_index_ops;
557         llsd->llsd_rb_obj = obj;
558         llsd->llsd_rbtree_valid = 1;
559         dev->dd_record_fid_accessed = 1;
560
561         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
562                lfsck_lfsck2name(lfsck));
563
564         return 0;
565 }
566
567 static void lfsck_rbtree_cleanup(const struct lu_env *env,
568                                  struct lfsck_component *com)
569 {
570         struct lfsck_instance           *lfsck = com->lc_lfsck;
571         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
572         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
573         struct rb_node                  *next;
574         struct lfsck_rbtree_node        *lrn;
575
576         lfsck->li_bottom->dd_record_fid_accessed = 0;
577         /* Invalid the rbtree, then no others will use it. */
578         write_lock(&llsd->llsd_rb_lock);
579         llsd->llsd_rbtree_valid = 0;
580         write_unlock(&llsd->llsd_rb_lock);
581
582         while (node != NULL) {
583                 next = rb_next(node);
584                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
585                 rb_erase(node, &llsd->llsd_rb_root);
586                 lfsck_rbtree_free(lrn);
587                 node = next;
588         }
589
590         if (llsd->llsd_rb_obj != NULL) {
591                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
592                 llsd->llsd_rb_obj = NULL;
593         }
594
595         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
596                lfsck_lfsck2name(lfsck));
597 }
598
599 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
600                                        struct lfsck_component *com,
601                                        const struct lu_fid *fid,
602                                        bool accessed)
603 {
604         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
605         struct lfsck_rbtree_node        *lrn;
606         bool                             insert = false;
607         int                              idx;
608         int                              rc     = 0;
609         ENTRY;
610
611         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
612                 RETURN_EXIT;
613
614         if (!fid_is_idif(fid) && !fid_is_norm(fid))
615                 RETURN_EXIT;
616
617         read_lock(&llsd->llsd_rb_lock);
618         if (!llsd->llsd_rbtree_valid)
619                 GOTO(unlock, rc = 0);
620
621         lrn = lfsck_rbtree_search(llsd, fid, NULL);
622         if (lrn == NULL) {
623                 struct lfsck_rbtree_node *tmp;
624
625                 LASSERT(!insert);
626
627                 read_unlock(&llsd->llsd_rb_lock);
628                 tmp = lfsck_rbtree_new(env, fid);
629                 if (IS_ERR(tmp))
630                         GOTO(out, rc = PTR_ERR(tmp));
631
632                 insert = true;
633                 write_lock(&llsd->llsd_rb_lock);
634                 if (!llsd->llsd_rbtree_valid) {
635                         lfsck_rbtree_free(tmp);
636                         GOTO(unlock, rc = 0);
637                 }
638
639                 lrn = lfsck_rbtree_insert(llsd, tmp);
640                 if (lrn != tmp)
641                         lfsck_rbtree_free(tmp);
642         }
643
644         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
645         /* Any accessed object must be a known object. */
646         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
647                 atomic_inc(&lrn->lrn_known_count);
648         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
649                 atomic_inc(&lrn->lrn_accessed_count);
650
651         GOTO(unlock, rc = 0);
652
653 unlock:
654         if (insert)
655                 write_unlock(&llsd->llsd_rb_lock);
656         else
657                 read_unlock(&llsd->llsd_rb_lock);
658 out:
659         if (rc != 0 && accessed) {
660                 struct lfsck_layout *lo = com->lc_file_ram;
661
662                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
663                        "bitmap, and will cause incorrect LFSCK OST-object "
664                        "handling, so disable it to cancel orphan handling "
665                        "for related device. rc = %d\n",
666                        lfsck_lfsck2name(com->lc_lfsck), rc);
667
668                 lo->ll_flags |= LF_INCOMPLETE;
669                 lfsck_rbtree_cleanup(env, com);
670         }
671 }
672
673 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
674                                    const struct lfsck_layout *src)
675 {
676         int i;
677
678         des->ll_magic = le32_to_cpu(src->ll_magic);
679         des->ll_status = le32_to_cpu(src->ll_status);
680         des->ll_flags = le32_to_cpu(src->ll_flags);
681         des->ll_success_count = le32_to_cpu(src->ll_success_count);
682         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
683         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
684         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
685         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
686         des->ll_time_last_checkpoint =
687                                 le64_to_cpu(src->ll_time_last_checkpoint);
688         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
689         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
690         des->ll_pos_first_inconsistent =
691                         le64_to_cpu(src->ll_pos_first_inconsistent);
692         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
693         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
694         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
695         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
696         for (i = 0; i < LLIT_MAX; i++)
697                 des->ll_objs_repaired[i] =
698                                 le64_to_cpu(src->ll_objs_repaired[i]);
699         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
700 }
701
702 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
703                                    const struct lfsck_layout *src)
704 {
705         int i;
706
707         des->ll_magic = cpu_to_le32(src->ll_magic);
708         des->ll_status = cpu_to_le32(src->ll_status);
709         des->ll_flags = cpu_to_le32(src->ll_flags);
710         des->ll_success_count = cpu_to_le32(src->ll_success_count);
711         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
712         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
713         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
714         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
715         des->ll_time_last_checkpoint =
716                                 cpu_to_le64(src->ll_time_last_checkpoint);
717         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
718         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
719         des->ll_pos_first_inconsistent =
720                         cpu_to_le64(src->ll_pos_first_inconsistent);
721         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
722         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
723         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
724         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
725         for (i = 0; i < LLIT_MAX; i++)
726                 des->ll_objs_repaired[i] =
727                                 cpu_to_le64(src->ll_objs_repaired[i]);
728         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
729 }
730
731 /**
732  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
733  * \retval 0: succeed.
734  * \retval -ve: failed cases.
735  */
736 static int lfsck_layout_load(const struct lu_env *env,
737                              struct lfsck_component *com)
738 {
739         struct lfsck_layout             *lo     = com->lc_file_ram;
740         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
741         ssize_t                          size   = com->lc_file_size;
742         loff_t                           pos    = 0;
743         int                              rc;
744
745         rc = dbo->dbo_read(env, com->lc_obj,
746                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
747                            BYPASS_CAPA);
748         if (rc == 0) {
749                 return -ENOENT;
750         } else if (rc < 0) {
751                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
752                        lfsck_lfsck2name(com->lc_lfsck), rc);
753                 return rc;
754         } else if (rc != size) {
755                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
756                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
757                 return 1;
758         }
759
760         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
761         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
762                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
763                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
764                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
765                 return 1;
766         }
767
768         return 0;
769 }
770
771 static int lfsck_layout_store(const struct lu_env *env,
772                               struct lfsck_component *com)
773 {
774         struct dt_object         *obj           = com->lc_obj;
775         struct lfsck_instance    *lfsck         = com->lc_lfsck;
776         struct lfsck_layout      *lo            = com->lc_file_disk;
777         struct thandle           *handle;
778         ssize_t                   size          = com->lc_file_size;
779         loff_t                    pos           = 0;
780         int                       rc;
781         ENTRY;
782
783         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
784         handle = dt_trans_create(env, lfsck->li_bottom);
785         if (IS_ERR(handle))
786                 GOTO(log, rc = PTR_ERR(handle));
787
788         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
789                                      pos, handle);
790         if (rc != 0)
791                 GOTO(out, rc);
792
793         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
794         if (rc != 0)
795                 GOTO(out, rc);
796
797         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
798                              handle);
799
800         GOTO(out, rc);
801
802 out:
803         dt_trans_stop(env, lfsck->li_bottom, handle);
804
805 log:
806         if (rc != 0)
807                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
808                        lfsck_lfsck2name(lfsck), rc);
809         return rc;
810 }
811
812 static int lfsck_layout_init(const struct lu_env *env,
813                              struct lfsck_component *com)
814 {
815         struct lfsck_layout *lo = com->lc_file_ram;
816         int rc;
817
818         memset(lo, 0, com->lc_file_size);
819         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
820         lo->ll_status = LS_INIT;
821         down_write(&com->lc_sem);
822         rc = lfsck_layout_store(env, com);
823         up_write(&com->lc_sem);
824
825         return rc;
826 }
827
828 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
829                              struct dt_object *obj, const struct lu_fid *fid)
830 {
831         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
832         struct lu_seq_range      range  = { 0 };
833         struct lustre_mdt_attrs *lma;
834         int                      rc;
835
836         fld_range_set_any(&range);
837         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
838         if (rc == 0) {
839                 if (fld_range_is_ost(&range))
840                         return 1;
841
842                 return 0;
843         }
844
845         lma = &lfsck_env_info(env)->lti_lma;
846         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
847                           XATTR_NAME_LMA, BYPASS_CAPA);
848         if (rc == sizeof(*lma)) {
849                 lustre_lma_swab(lma);
850
851                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
852         }
853
854         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
855
856         return rc > 0;
857 }
858
859 static struct lfsck_layout_seq *
860 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
861 {
862         struct lfsck_layout_seq *lls;
863
864         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
865                 if (lls->lls_seq == seq)
866                         return lls;
867
868                 if (lls->lls_seq > seq)
869                         return NULL;
870         }
871
872         return NULL;
873 }
874
875 static void
876 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
877                         struct lfsck_layout_seq *lls)
878 {
879         struct lfsck_layout_seq *tmp;
880         struct list_head        *pos = &llsd->llsd_seq_list;
881
882         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
883                 if (lls->lls_seq < tmp->lls_seq) {
884                         pos = &tmp->lls_list;
885                         break;
886                 }
887         }
888         list_add_tail(&lls->lls_list, pos);
889 }
890
891 static int
892 lfsck_layout_lastid_create(const struct lu_env *env,
893                            struct lfsck_instance *lfsck,
894                            struct dt_object *obj)
895 {
896         struct lfsck_thread_info *info   = lfsck_env_info(env);
897         struct lu_attr           *la     = &info->lti_la;
898         struct dt_object_format  *dof    = &info->lti_dof;
899         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
900         struct dt_device         *dt     = lfsck->li_bottom;
901         struct thandle           *th;
902         __u64                     lastid = 0;
903         loff_t                    pos    = 0;
904         int                       rc;
905         ENTRY;
906
907         if (bk->lb_param & LPF_DRYRUN)
908                 return 0;
909
910         memset(la, 0, sizeof(*la));
911         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
912         la->la_valid = LA_MODE | LA_UID | LA_GID;
913         dof->dof_type = dt_mode_to_dft(S_IFREG);
914
915         th = dt_trans_create(env, dt);
916         if (IS_ERR(th))
917                 GOTO(log, rc = PTR_ERR(th));
918
919         rc = dt_declare_create(env, obj, la, NULL, dof, th);
920         if (rc != 0)
921                 GOTO(stop, rc);
922
923         rc = dt_declare_record_write(env, obj,
924                                      lfsck_buf_get(env, &lastid,
925                                                    sizeof(lastid)),
926                                      pos, th);
927         if (rc != 0)
928                 GOTO(stop, rc);
929
930         rc = dt_trans_start_local(env, dt, th);
931         if (rc != 0)
932                 GOTO(stop, rc);
933
934         dt_write_lock(env, obj, 0);
935         if (likely(dt_object_exists(obj) == 0)) {
936                 rc = dt_create(env, obj, la, NULL, dof, th);
937                 if (rc == 0)
938                         rc = dt_record_write(env, obj,
939                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
940                                 &pos, th);
941         }
942         dt_write_unlock(env, obj);
943
944         GOTO(stop, rc);
945
946 stop:
947         dt_trans_stop(env, dt, th);
948
949 log:
950         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
951                LPX64": rc = %d\n",
952                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
953
954         return rc;
955 }
956
957 static int
958 lfsck_layout_lastid_reload(const struct lu_env *env,
959                            struct lfsck_component *com,
960                            struct lfsck_layout_seq *lls)
961 {
962         __u64   lastid;
963         loff_t  pos     = 0;
964         int     rc;
965
966         dt_read_lock(env, lls->lls_lastid_obj, 0);
967         rc = dt_record_read(env, lls->lls_lastid_obj,
968                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
969         dt_read_unlock(env, lls->lls_lastid_obj);
970         if (unlikely(rc != 0))
971                 return rc;
972
973         lastid = le64_to_cpu(lastid);
974         if (lastid < lls->lls_lastid_known) {
975                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
976                 struct lfsck_layout     *lo     = com->lc_file_ram;
977
978                 lls->lls_lastid = lls->lls_lastid_known;
979                 lls->lls_dirty = 1;
980                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
981                         LASSERT(lfsck->li_out_notify != NULL);
982
983                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
984                                              LE_LASTID_REBUILDING);
985                         lo->ll_flags |= LF_CRASHED_LASTID;
986
987                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
988                                "LAST_ID file (1) for the sequence "LPX64
989                                ", old value "LPU64", known value "LPU64"\n",
990                                lfsck_lfsck2name(lfsck), lls->lls_seq,
991                                lastid, lls->lls_lastid);
992                 }
993         } else if (lastid >= lls->lls_lastid) {
994                 lls->lls_lastid = lastid;
995                 lls->lls_dirty = 0;
996         }
997
998         return 0;
999 }
1000
1001 static int
1002 lfsck_layout_lastid_store(const struct lu_env *env,
1003                           struct lfsck_component *com)
1004 {
1005         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1006         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1007         struct dt_device                *dt     = lfsck->li_bottom;
1008         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1009         struct lfsck_layout_seq         *lls;
1010         struct thandle                  *th;
1011         __u64                            lastid;
1012         int                              rc     = 0;
1013         int                              rc1    = 0;
1014
1015         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1016                 loff_t pos = 0;
1017
1018                 if (!lls->lls_dirty)
1019                         continue;
1020
1021                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1022                        "<seq> "LPX64" as <oid> "LPU64"\n",
1023                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1024
1025                 if (bk->lb_param & LPF_DRYRUN) {
1026                         lls->lls_dirty = 0;
1027                         continue;
1028                 }
1029
1030                 th = dt_trans_create(env, dt);
1031                 if (IS_ERR(th)) {
1032                         rc1 = PTR_ERR(th);
1033                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1034                                "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1035                                lfsck_lfsck2name(com->lc_lfsck),
1036                                lls->lls_seq, rc1);
1037                         continue;
1038                 }
1039
1040                 lastid = cpu_to_le64(lls->lls_lastid);
1041                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1042                                              lfsck_buf_get(env, &lastid,
1043                                                            sizeof(lastid)),
1044                                              pos, th);
1045                 if (rc != 0)
1046                         goto stop;
1047
1048                 rc = dt_trans_start_local(env, dt, th);
1049                 if (rc != 0)
1050                         goto stop;
1051
1052                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1053                 rc = dt_record_write(env, lls->lls_lastid_obj,
1054                                      lfsck_buf_get(env, &lastid,
1055                                      sizeof(lastid)), &pos, th);
1056                 dt_write_unlock(env, lls->lls_lastid_obj);
1057                 if (rc == 0)
1058                         lls->lls_dirty = 0;
1059
1060 stop:
1061                 dt_trans_stop(env, dt, th);
1062                 if (rc != 0) {
1063                         rc1 = rc;
1064                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1065                                "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1066                                lfsck_lfsck2name(com->lc_lfsck),
1067                                lls->lls_seq, rc1);
1068                 }
1069         }
1070
1071         return rc1;
1072 }
1073
1074 static int
1075 lfsck_layout_lastid_load(const struct lu_env *env,
1076                          struct lfsck_component *com,
1077                          struct lfsck_layout_seq *lls)
1078 {
1079         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1080         struct lfsck_layout     *lo     = com->lc_file_ram;
1081         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1082         struct dt_object        *obj;
1083         loff_t                   pos    = 0;
1084         int                      rc;
1085         ENTRY;
1086
1087         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1088         obj = dt_locate(env, lfsck->li_bottom, fid);
1089         if (IS_ERR(obj))
1090                 RETURN(PTR_ERR(obj));
1091
1092         /* LAST_ID crashed, to be rebuilt */
1093         if (dt_object_exists(obj) == 0) {
1094                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1095                         LASSERT(lfsck->li_out_notify != NULL);
1096
1097                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1098                                              LE_LASTID_REBUILDING);
1099                         lo->ll_flags |= LF_CRASHED_LASTID;
1100
1101                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1102                                "LAST_ID file for sequence "LPX64"\n",
1103                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1104
1105                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1106                             cfs_fail_val > 0) {
1107                                 struct l_wait_info lwi = LWI_TIMEOUT(
1108                                                 cfs_time_seconds(cfs_fail_val),
1109                                                 NULL, NULL);
1110
1111                                 up_write(&com->lc_sem);
1112                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1113                                              !thread_is_running(&lfsck->li_thread),
1114                                              &lwi);
1115                                 down_write(&com->lc_sem);
1116                         }
1117                 }
1118
1119                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1120         } else {
1121                 dt_read_lock(env, obj, 0);
1122                 rc = dt_read(env, obj,
1123                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1124                         &pos);
1125                 dt_read_unlock(env, obj);
1126                 if (rc != 0 && rc != sizeof(__u64))
1127                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1128
1129                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1130                         LASSERT(lfsck->li_out_notify != NULL);
1131
1132                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1133                                              LE_LASTID_REBUILDING);
1134                         lo->ll_flags |= LF_CRASHED_LASTID;
1135
1136                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1137                                "LAST_ID file for the sequence "LPX64
1138                                ": rc = %d\n",
1139                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1140                 }
1141
1142                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1143                 rc = 0;
1144         }
1145
1146         GOTO(out, rc);
1147
1148 out:
1149         if (rc != 0)
1150                 lfsck_object_put(env, obj);
1151         else
1152                 lls->lls_lastid_obj = obj;
1153
1154         return rc;
1155 }
1156
1157 static void lfsck_layout_record_failure(const struct lu_env *env,
1158                                                  struct lfsck_instance *lfsck,
1159                                                  struct lfsck_layout *lo)
1160 {
1161         lo->ll_objs_failed_phase1++;
1162         if (unlikely(lo->ll_pos_first_inconsistent == 0)) {
1163                 lo->ll_pos_first_inconsistent =
1164                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1165                                                         lfsck->li_di_oit);
1166
1167                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1168                        "inconsistency at the pos ["LPU64"]\n",
1169                        lfsck_lfsck2name(lfsck),
1170                        lo->ll_pos_first_inconsistent);
1171         }
1172 }
1173
1174 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1175                                                struct ptlrpc_request *req,
1176                                                void *args, int rc)
1177 {
1178         struct lfsck_async_interpret_args *laia = args;
1179         struct lfsck_component            *com  = laia->laia_com;
1180         struct lfsck_layout_master_data   *llmd = com->lc_data;
1181         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1182         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1183         struct lfsck_request              *lr   = laia->laia_lr;
1184
1185         switch (lr->lr_event) {
1186         case LE_START:
1187                 if (rc != 0) {
1188                         struct lfsck_layout *lo = com->lc_file_ram;
1189
1190                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout "
1191                                "start: rc = %d\n",
1192                                lfsck_lfsck2name(com->lc_lfsck),
1193                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1194                                ltd->ltd_index, rc);
1195                         lo->ll_flags |= LF_INCOMPLETE;
1196                         break;
1197                 }
1198
1199                 spin_lock(&ltds->ltd_lock);
1200                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1201                         spin_unlock(&ltds->ltd_lock);
1202                         break;
1203                 }
1204
1205                 if (lr->lr_flags & LEF_TO_OST) {
1206                         if (list_empty(&ltd->ltd_layout_list))
1207                                 list_add_tail(&ltd->ltd_layout_list,
1208                                               &llmd->llmd_ost_list);
1209                         if (list_empty(&ltd->ltd_layout_phase_list))
1210                                 list_add_tail(&ltd->ltd_layout_phase_list,
1211                                               &llmd->llmd_ost_phase1_list);
1212                 } else {
1213                         if (list_empty(&ltd->ltd_layout_list))
1214                                 list_add_tail(&ltd->ltd_layout_list,
1215                                               &llmd->llmd_mdt_list);
1216                         if (list_empty(&ltd->ltd_layout_phase_list))
1217                                 list_add_tail(&ltd->ltd_layout_phase_list,
1218                                               &llmd->llmd_mdt_phase1_list);
1219                 }
1220                 spin_unlock(&ltds->ltd_lock);
1221                 break;
1222         case LE_STOP:
1223         case LE_PHASE1_DONE:
1224         case LE_PHASE2_DONE:
1225         case LE_PEER_EXIT:
1226                 if (rc != 0 && rc != -EALREADY)
1227                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout: "
1228                                "event = %d, rc = %d\n",
1229                                lfsck_lfsck2name(com->lc_lfsck),
1230                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1231                                ltd->ltd_index, lr->lr_event, rc);
1232                 break;
1233         case LE_QUERY: {
1234                 struct lfsck_reply *reply;
1235
1236                 if (rc != 0) {
1237                         spin_lock(&ltds->ltd_lock);
1238                         list_del_init(&ltd->ltd_layout_phase_list);
1239                         list_del_init(&ltd->ltd_layout_list);
1240                         spin_unlock(&ltds->ltd_lock);
1241                         break;
1242                 }
1243
1244                 reply = req_capsule_server_get(&req->rq_pill,
1245                                                &RMF_LFSCK_REPLY);
1246                 if (reply == NULL) {
1247                         rc = -EPROTO;
1248                         CDEBUG(D_LFSCK, "%s:  invalid query reply: rc = %d\n",
1249                                lfsck_lfsck2name(com->lc_lfsck), rc);
1250                         spin_lock(&ltds->ltd_lock);
1251                         list_del_init(&ltd->ltd_layout_phase_list);
1252                         list_del_init(&ltd->ltd_layout_list);
1253                         spin_unlock(&ltds->ltd_lock);
1254                         break;
1255                 }
1256
1257                 switch (reply->lr_status) {
1258                 case LS_SCANNING_PHASE1:
1259                         break;
1260                 case LS_SCANNING_PHASE2:
1261                         spin_lock(&ltds->ltd_lock);
1262                         list_del_init(&ltd->ltd_layout_phase_list);
1263                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1264                                 spin_unlock(&ltds->ltd_lock);
1265                                 break;
1266                         }
1267
1268                         if (lr->lr_flags & LEF_TO_OST)
1269                                 list_add_tail(&ltd->ltd_layout_phase_list,
1270                                               &llmd->llmd_ost_phase2_list);
1271                         else
1272                                 list_add_tail(&ltd->ltd_layout_phase_list,
1273                                               &llmd->llmd_mdt_phase2_list);
1274                         spin_unlock(&ltds->ltd_lock);
1275                         break;
1276                 default:
1277                         spin_lock(&ltds->ltd_lock);
1278                         list_del_init(&ltd->ltd_layout_phase_list);
1279                         list_del_init(&ltd->ltd_layout_list);
1280                         spin_unlock(&ltds->ltd_lock);
1281                         break;
1282                 }
1283                 break;
1284         }
1285         default:
1286                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1287                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1288                 break;
1289         }
1290
1291         if (!laia->laia_shared) {
1292                 lfsck_tgt_put(ltd);
1293                 lfsck_component_put(env, com);
1294         }
1295
1296         return 0;
1297 }
1298
1299 static int lfsck_layout_master_query_others(const struct lu_env *env,
1300                                             struct lfsck_component *com)
1301 {
1302         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1303         struct lfsck_request              *lr    = &info->lti_lr;
1304         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1305         struct lfsck_instance             *lfsck = com->lc_lfsck;
1306         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1307         struct ptlrpc_request_set         *set;
1308         struct lfsck_tgt_descs            *ltds;
1309         struct lfsck_tgt_desc             *ltd;
1310         struct list_head                  *head;
1311         int                                rc    = 0;
1312         int                                rc1   = 0;
1313         ENTRY;
1314
1315         set = ptlrpc_prep_set();
1316         if (set == NULL)
1317                 RETURN(-ENOMEM);
1318
1319         llmd->llmd_touch_gen++;
1320         memset(lr, 0, sizeof(*lr));
1321         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1322         lr->lr_event = LE_QUERY;
1323         lr->lr_active = LFSCK_TYPE_LAYOUT;
1324         laia->laia_com = com;
1325         laia->laia_lr = lr;
1326         laia->laia_shared = 0;
1327
1328         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1329                 ltds = &lfsck->li_mdt_descs;
1330                 lr->lr_flags = 0;
1331                 head = &llmd->llmd_mdt_phase1_list;
1332         } else {
1333
1334 again:
1335                 ltds = &lfsck->li_ost_descs;
1336                 lr->lr_flags = LEF_TO_OST;
1337                 head = &llmd->llmd_ost_phase1_list;
1338         }
1339
1340         laia->laia_ltds = ltds;
1341         spin_lock(&ltds->ltd_lock);
1342         while (!list_empty(head)) {
1343                 ltd = list_entry(head->next,
1344                                  struct lfsck_tgt_desc,
1345                                  ltd_layout_phase_list);
1346                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1347                         break;
1348
1349                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1350                 list_move_tail(&ltd->ltd_layout_phase_list, head);
1351                 atomic_inc(&ltd->ltd_ref);
1352                 laia->laia_ltd = ltd;
1353                 spin_unlock(&ltds->ltd_lock);
1354                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1355                                          lfsck_layout_master_async_interpret,
1356                                          laia, LFSCK_QUERY);
1357                 if (rc != 0) {
1358                         CDEBUG(D_LFSCK, "%s: layout LFSCK fail to query %s %x: "
1359                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1360                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1361                                ltd->ltd_index, rc);
1362                         lfsck_tgt_put(ltd);
1363                         rc1 = rc;
1364                 }
1365                 spin_lock(&ltds->ltd_lock);
1366         }
1367         spin_unlock(&ltds->ltd_lock);
1368
1369         rc = ptlrpc_set_wait(set);
1370         if (rc < 0) {
1371                 ptlrpc_set_destroy(set);
1372                 RETURN(rc);
1373         }
1374
1375         if (!(lr->lr_flags & LEF_TO_OST) &&
1376             list_empty(&llmd->llmd_mdt_phase1_list))
1377                 goto again;
1378
1379         ptlrpc_set_destroy(set);
1380
1381         RETURN(rc1 != 0 ? rc1 : rc);
1382 }
1383
1384 static inline bool
1385 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1386 {
1387         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1388                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1389                 list_empty(&llmd->llmd_ost_phase1_list));
1390 }
1391
1392 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1393                                              struct lfsck_component *com,
1394                                              struct lfsck_request *lr)
1395 {
1396         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1397         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1398         struct lfsck_instance             *lfsck = com->lc_lfsck;
1399         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1400         struct lfsck_layout               *lo    = com->lc_file_ram;
1401         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1402         struct ptlrpc_request_set         *set;
1403         struct lfsck_tgt_descs            *ltds;
1404         struct lfsck_tgt_desc             *ltd;
1405         struct lfsck_tgt_desc             *next;
1406         struct list_head                  *head;
1407         __u32                              idx;
1408         int                                rc    = 0;
1409         ENTRY;
1410
1411         set = ptlrpc_prep_set();
1412         if (set == NULL)
1413                 RETURN(-ENOMEM);
1414
1415         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1416         lr->lr_active = LFSCK_TYPE_LAYOUT;
1417         laia->laia_com = com;
1418         laia->laia_lr = lr;
1419         laia->laia_shared = 0;
1420         switch (lr->lr_event) {
1421         case LE_START:
1422                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1423                 ltds = &lfsck->li_ost_descs;
1424                 laia->laia_ltds = ltds;
1425                 down_read(&ltds->ltd_rw_sem);
1426                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1427                         ltd = lfsck_tgt_get(ltds, idx);
1428                         LASSERT(ltd != NULL);
1429
1430                         laia->laia_ltd = ltd;
1431                         ltd->ltd_layout_done = 0;
1432                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1433                                         lfsck_layout_master_async_interpret,
1434                                         laia, LFSCK_NOTIFY);
1435                         if (rc != 0) {
1436                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1437                                        "notify %s %x for start: rc = %d\n",
1438                                        lfsck_lfsck2name(lfsck),
1439                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1440                                        "MDT", idx, rc);
1441                                 lfsck_tgt_put(ltd);
1442                                 lo->ll_flags |= LF_INCOMPLETE;
1443                         }
1444                 }
1445                 up_read(&ltds->ltd_rw_sem);
1446
1447                 /* Sync up */
1448                 rc = ptlrpc_set_wait(set);
1449                 if (rc < 0) {
1450                         ptlrpc_set_destroy(set);
1451                         RETURN(rc);
1452                 }
1453
1454                 if (!(bk->lb_param & LPF_ALL_TGT))
1455                         break;
1456
1457                 /* link other MDT targets locallly. */
1458                 ltds = &lfsck->li_mdt_descs;
1459                 spin_lock(&ltds->ltd_lock);
1460                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1461                         ltd = LTD_TGT(ltds, idx);
1462                         LASSERT(ltd != NULL);
1463
1464                         if (!list_empty(&ltd->ltd_layout_list))
1465                                 continue;
1466
1467                         list_add_tail(&ltd->ltd_layout_list,
1468                                       &llmd->llmd_mdt_list);
1469                         list_add_tail(&ltd->ltd_layout_phase_list,
1470                                       &llmd->llmd_mdt_phase1_list);
1471                 }
1472                 spin_unlock(&ltds->ltd_lock);
1473                 break;
1474         case LE_STOP:
1475         case LE_PHASE2_DONE:
1476         case LE_PEER_EXIT: {
1477                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1478                 if (bk->lb_param & LPF_ALL_TGT) {
1479                         head = &llmd->llmd_mdt_list;
1480                         ltds = &lfsck->li_mdt_descs;
1481                         if (lr->lr_event == LE_STOP) {
1482                                 /* unlink other MDT targets locallly. */
1483                                 spin_lock(&ltds->ltd_lock);
1484                                 list_for_each_entry_safe(ltd, next, head,
1485                                                          ltd_layout_list) {
1486                                         list_del_init(&ltd->ltd_layout_phase_list);
1487                                         list_del_init(&ltd->ltd_layout_list);
1488                                 }
1489                                 spin_unlock(&ltds->ltd_lock);
1490
1491                                 lr->lr_flags |= LEF_TO_OST;
1492                                 head = &llmd->llmd_ost_list;
1493                                 ltds = &lfsck->li_ost_descs;
1494                         } else {
1495                                 lr->lr_flags &= ~LEF_TO_OST;
1496                         }
1497                 } else {
1498                         lr->lr_flags |= LEF_TO_OST;
1499                         head = &llmd->llmd_ost_list;
1500                         ltds = &lfsck->li_ost_descs;
1501                 }
1502
1503 again:
1504                 laia->laia_ltds = ltds;
1505                 spin_lock(&ltds->ltd_lock);
1506                 while (!list_empty(head)) {
1507                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1508                                          ltd_layout_list);
1509                         if (!list_empty(&ltd->ltd_layout_phase_list))
1510                                 list_del_init(&ltd->ltd_layout_phase_list);
1511                         list_del_init(&ltd->ltd_layout_list);
1512                         atomic_inc(&ltd->ltd_ref);
1513                         laia->laia_ltd = ltd;
1514                         spin_unlock(&ltds->ltd_lock);
1515                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1516                                         lfsck_layout_master_async_interpret,
1517                                         laia, LFSCK_NOTIFY);
1518                         if (rc != 0) {
1519                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1520                                        "notify %s %x for stop/phase2_done/"
1521                                        "peer_exit: rc = %d\n",
1522                                        lfsck_lfsck2name(lfsck),
1523                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1524                                        "MDT", ltd->ltd_index, rc);
1525                                 lfsck_tgt_put(ltd);
1526                         }
1527                         spin_lock(&ltds->ltd_lock);
1528                 }
1529                 spin_unlock(&ltds->ltd_lock);
1530
1531                 rc = ptlrpc_set_wait(set);
1532                 if (rc < 0) {
1533                         ptlrpc_set_destroy(set);
1534                         RETURN(rc);
1535                 }
1536
1537                 if (!(lr->lr_flags & LEF_TO_OST)) {
1538                         lr->lr_flags |= LEF_TO_OST;
1539                         head = &llmd->llmd_ost_list;
1540                         ltds = &lfsck->li_ost_descs;
1541                         goto again;
1542                 }
1543                 break;
1544         }
1545         case LE_PHASE1_DONE:
1546                 llmd->llmd_touch_gen++;
1547                 ltds = &lfsck->li_mdt_descs;
1548                 laia->laia_ltds = ltds;
1549                 spin_lock(&ltds->ltd_lock);
1550                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1551                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1552                                          struct lfsck_tgt_desc,
1553                                          ltd_layout_phase_list);
1554                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1555                                 break;
1556
1557                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1558                         list_move_tail(&ltd->ltd_layout_phase_list,
1559                                        &llmd->llmd_mdt_phase1_list);
1560                         atomic_inc(&ltd->ltd_ref);
1561                         laia->laia_ltd = ltd;
1562                         spin_unlock(&ltds->ltd_lock);
1563                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1564                                         lfsck_layout_master_async_interpret,
1565                                         laia, LFSCK_NOTIFY);
1566                         if (rc != 0) {
1567                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1568                                        "notify MDT %x for phase1_done: "
1569                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1570                                        ltd->ltd_index, rc);
1571                                 lfsck_tgt_put(ltd);
1572                         }
1573                         spin_lock(&ltds->ltd_lock);
1574                 }
1575                 spin_unlock(&ltds->ltd_lock);
1576                 break;
1577         default:
1578                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1579                        lfsck_lfsck2name(lfsck), lr->lr_event);
1580                 rc = -EINVAL;
1581                 break;
1582         }
1583
1584         rc = ptlrpc_set_wait(set);
1585         ptlrpc_set_destroy(set);
1586
1587         RETURN(rc);
1588 }
1589
1590 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1591                                            struct lfsck_component *com,
1592                                            int rc)
1593 {
1594         struct lfsck_instance   *lfsck = com->lc_lfsck;
1595         struct lfsck_layout     *lo    = com->lc_file_ram;
1596         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1597
1598         down_write(&com->lc_sem);
1599         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1600                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1601         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1602         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1603
1604         if (rc > 0) {
1605                 com->lc_journal = 0;
1606                 if (lo->ll_flags & LF_INCOMPLETE)
1607                         lo->ll_status = LS_PARTIAL;
1608                 else
1609                         lo->ll_status = LS_COMPLETED;
1610                 if (!(bk->lb_param & LPF_DRYRUN))
1611                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1612                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1613                 lo->ll_success_count++;
1614         } else if (rc == 0) {
1615                 lo->ll_status = lfsck->li_status;
1616                 if (lo->ll_status == 0)
1617                         lo->ll_status = LS_STOPPED;
1618         } else {
1619                 lo->ll_status = LS_FAILED;
1620         }
1621
1622         rc = lfsck_layout_store(env, com);
1623         up_write(&com->lc_sem);
1624
1625         return rc;
1626 }
1627
1628 static int lfsck_layout_lock(const struct lu_env *env,
1629                              struct lfsck_component *com,
1630                              struct dt_object *obj,
1631                              struct lustre_handle *lh, __u64 bits)
1632 {
1633         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1634         ldlm_policy_data_t              *policy = &info->lti_policy;
1635         struct ldlm_res_id              *resid  = &info->lti_resid;
1636         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1637         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1638         int                              rc;
1639
1640         LASSERT(lfsck->li_namespace != NULL);
1641
1642         memset(policy, 0, sizeof(*policy));
1643         policy->l_inodebits.bits = bits;
1644         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1645         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1646                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1647                                     ldlm_completion_ast, NULL, NULL, 0,
1648                                     LVB_T_NONE, NULL, lh);
1649         if (rc == ELDLM_OK) {
1650                 rc = 0;
1651         } else {
1652                 memset(lh, 0, sizeof(*lh));
1653                 rc = -EIO;
1654         }
1655
1656         return rc;
1657 }
1658
1659 static void lfsck_layout_unlock(struct lustre_handle *lh)
1660 {
1661         if (lustre_handle_is_used(lh)) {
1662                 ldlm_lock_decref(lh, LCK_EX);
1663                 memset(lh, 0, sizeof(*lh));
1664         }
1665 }
1666
1667 static int lfsck_layout_trans_stop(const struct lu_env *env,
1668                                    struct dt_device *dev,
1669                                    struct thandle *handle, int result)
1670 {
1671         int rc;
1672
1673         handle->th_result = result;
1674         rc = dt_trans_stop(env, dev, handle);
1675         if (rc > 0)
1676                 rc = 0;
1677         else if (rc == 0)
1678                 rc = 1;
1679
1680         return rc;
1681 }
1682
1683 /**
1684  * Get the system default stripe size.
1685  *
1686  * \param[in] env       pointer to the thread context
1687  * \param[in] lfsck     pointer to the lfsck instance
1688  * \param[out] size     pointer to the default stripe size
1689  *
1690  * \retval              0 for success
1691  * \retval              negative error number on failure
1692  */
1693 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1694                                            struct lfsck_instance *lfsck,
1695                                            __u32 *size)
1696 {
1697         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1698         struct dt_object        *root;
1699         int                      rc;
1700
1701         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1702         if (IS_ERR(root))
1703                 return PTR_ERR(root);
1704
1705         /* Get the default stripe size via xattr_get on the backend root. */
1706         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1707                           XATTR_NAME_LOV, BYPASS_CAPA);
1708         if (rc > 0) {
1709                 /* The lum->lmm_stripe_size is LE mode. The *size also
1710                  * should be LE mode. So it is unnecessary to convert. */
1711                 *size = lum->lmm_stripe_size;
1712                 rc = 0;
1713         } else if (unlikely(rc == 0)) {
1714                 rc = -EINVAL;
1715         }
1716
1717         lfsck_object_put(env, root);
1718
1719         return rc;
1720 }
1721
1722 /**
1723  * \retval       +1: repaired
1724  * \retval        0: did nothing
1725  * \retval      -ve: on error
1726  */
1727 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1728                                      struct thandle *handle,
1729                                      struct dt_object *parent,
1730                                      struct lu_fid *cfid,
1731                                      struct lu_buf *buf,
1732                                      struct lov_ost_data_v1 *slot,
1733                                      int fl, __u32 ost_idx)
1734 {
1735         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1736         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1737         struct lu_buf            ea_buf;
1738         int                      rc;
1739         __u32                    magic;
1740         __u16                    count;
1741
1742         magic = le32_to_cpu(lmm->lmm_magic);
1743         count = le16_to_cpu(lmm->lmm_stripe_count);
1744
1745         fid_to_ostid(cfid, oi);
1746         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1747         slot->l_ost_gen = cpu_to_le32(0);
1748         slot->l_ost_idx = cpu_to_le32(ost_idx);
1749
1750         if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1751                 struct lov_ost_data_v1 *objs;
1752                 int                     i;
1753
1754                 if (magic == LOV_MAGIC_V1)
1755                         objs = &lmm->lmm_objects[0];
1756                 else
1757                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1758                 for (i = 0; i < count; i++, objs++) {
1759                         if (objs != slot && lovea_slot_is_dummy(objs))
1760                                 break;
1761                 }
1762
1763                 /* If the @slot is the last dummy slot to be refilled,
1764                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1765                 if (i == count)
1766                         lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1767         }
1768
1769         lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1770         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1771                           BYPASS_CAPA);
1772         if (rc == 0)
1773                 rc = 1;
1774
1775         return rc;
1776 }
1777
1778 /**
1779  * \retval       +1: repaired
1780  * \retval        0: did nothing
1781  * \retval      -ve: on error
1782  */
1783 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1784                                      struct lfsck_instance *lfsck,
1785                                      struct thandle *handle,
1786                                      struct dt_object *parent,
1787                                      struct lu_fid *cfid,
1788                                      struct lu_buf *buf, int fl,
1789                                      __u32 ost_idx, __u32 ea_off, bool reset)
1790 {
1791         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1792         struct lov_ost_data_v1  *objs;
1793         int                      rc;
1794         __u16                    count;
1795         bool                     hole   = false;
1796         ENTRY;
1797
1798         if (fl == LU_XATTR_CREATE || reset) {
1799                 __u32 pattern = LOV_PATTERN_RAID0;
1800
1801                 count = ea_off + 1;
1802                 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1803
1804                 if (ea_off != 0 || reset) {
1805                         pattern |= LOV_PATTERN_F_HOLE;
1806                         hole = true;
1807                 }
1808
1809                 memset(lmm, 0, buf->lb_len);
1810                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1811                 lmm->lmm_pattern = cpu_to_le32(pattern);
1812                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1813                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1814
1815                 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1816                                                      &lmm->lmm_stripe_size);
1817                 if (rc != 0)
1818                         RETURN(rc);
1819
1820                 objs = &lmm->lmm_objects[ea_off];
1821         } else {
1822                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1823                 int     gap;
1824
1825                 count = le16_to_cpu(lmm->lmm_stripe_count);
1826                 if (magic == LOV_MAGIC_V1)
1827                         objs = &lmm->lmm_objects[count];
1828                 else
1829                         objs = &((struct lov_mds_md_v3 *)lmm)->
1830                                                         lmm_objects[count];
1831
1832                 gap = ea_off - count;
1833                 if (gap >= 0)
1834                         count = ea_off + 1;
1835                 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1836
1837                 if (gap > 0) {
1838                         memset(objs, 0, gap * sizeof(*objs));
1839                         lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1840                         hole = true;
1841                 }
1842
1843                 lmm->lmm_layout_gen =
1844                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1845                 objs += gap;
1846         }
1847
1848         lmm->lmm_stripe_count = cpu_to_le16(count);
1849         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1850                                        fl, ost_idx);
1851
1852         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1853                DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1854                "reset %s, %s LOV EA hole: rc = %d\n",
1855                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1856                ost_idx, ea_off, fl, reset ? "yes" : "no",
1857                hole ? "with" : "without", rc);
1858
1859         RETURN(rc);
1860 }
1861
1862 /**
1863  * \retval       +1: repaired
1864  * \retval        0: did nothing
1865  * \retval      -ve: on error
1866  */
1867 static int lfsck_layout_update_pfid(const struct lu_env *env,
1868                                     struct lfsck_component *com,
1869                                     struct dt_object *parent,
1870                                     struct lu_fid *cfid,
1871                                     struct dt_device *cdev, __u32 ea_off)
1872 {
1873         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1874         struct dt_object        *child;
1875         struct thandle          *handle;
1876         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1877         struct lu_buf           *buf;
1878         int                      rc     = 0;
1879         ENTRY;
1880
1881         child = lfsck_object_find_by_dev(env, cdev, cfid);
1882         if (IS_ERR(child))
1883                 RETURN(PTR_ERR(child));
1884
1885         handle = dt_trans_create(env, cdev);
1886         if (IS_ERR(handle))
1887                 GOTO(out, rc = PTR_ERR(handle));
1888
1889         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1890         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1891         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1892          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1893          * parent MDT-object's layout EA. */
1894         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1895         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1896
1897         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1898         if (rc != 0)
1899                 GOTO(stop, rc);
1900
1901         rc = dt_trans_start(env, cdev, handle);
1902         if (rc != 0)
1903                 GOTO(stop, rc);
1904
1905         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1906                           BYPASS_CAPA);
1907
1908         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1909
1910 stop:
1911         dt_trans_stop(env, cdev, handle);
1912
1913 out:
1914         lu_object_put(env, &child->do_lu);
1915
1916         return rc;
1917 }
1918
1919 /**
1920  * This function will create the MDT-object with the given (partial) LOV EA.
1921  *
1922  * Under some data corruption cases, the MDT-object of the file may be lost,
1923  * but its OST-objects, or some of them are there. The layout LFSCK needs to
1924  * re-create the MDT-object with the orphan OST-object(s) information.
1925  *
1926  * On the other hand, the LFSCK may has created some OST-object for repairing
1927  * dangling LOV EA reference, but as the LFSCK processing, it may find that
1928  * the old OST-object is there and should replace the former new created OST
1929  * object. Unfortunately, some others have modified such newly created object.
1930  * To keep the data (both new and old), the LFSCK will create MDT-object with
1931  * new FID to reference the original OST-object.
1932  *
1933  * \param[in] env       pointer to the thread context
1934  * \param[in] com       pointer to the lfsck component
1935  * \param[in] ltd       pointer to target device descriptor
1936  * \param[in] rec       pointer to the record for the orphan OST-object
1937  * \param[in] cfid      pointer to FID for the orphan OST-object
1938  * \param[in] infix     additional information, such as the FID for original
1939  *                      MDT-object and the stripe offset in the LOV EA
1940  * \param[in] type      the type for describing why the orphan MDT-object is
1941  *                      created. The rules are as following:
1942  *
1943  *  type "C":           Multiple OST-objects claim the same MDT-object and the
1944  *                      same slot in the layout EA. Then the LFSCK will create
1945  *                      new MDT-object(s) to hold the conflict OST-object(s).
1946  *
1947  *  type "N":           The orphan OST-object does not know which one was the
1948  *                      real parent MDT-object, so the LFSCK uses new FID for
1949  *                      its parent MDT-object.
1950  *
1951  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
1952  *                      but does not know the position (the file name) in the
1953  *                      namespace.
1954  *
1955  * The orphan name will be like:
1956  * ${FID}-${infix}-${type}-${conflict_version}
1957  *
1958  * \param[in] ea_off    the stripe offset in the LOV EA
1959  *
1960  * \retval              positive on repaired something
1961  * \retval              0 if needs to repair nothing
1962  * \retval              negative error number on failure
1963  */
1964 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1965                                         struct lfsck_component *com,
1966                                         struct lfsck_tgt_desc *ltd,
1967                                         struct lu_orphan_rec *rec,
1968                                         struct lu_fid *cfid,
1969                                         const char *infix,
1970                                         const char *type,
1971                                         __u32 ea_off)
1972 {
1973         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1974         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
1975         char                            *name   = info->lti_key;
1976         struct lu_attr                  *la     = &info->lti_la;
1977         struct dt_object_format         *dof    = &info->lti_dof;
1978         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1979         struct lu_fid                   *pfid   = &rec->lor_fid;
1980         struct lu_fid                   *tfid   = &info->lti_fid3;
1981         struct dt_device                *next   = lfsck->li_next;
1982         struct dt_object                *pobj   = NULL;
1983         struct dt_object                *cobj   = NULL;
1984         struct thandle                  *th     = NULL;
1985         struct lu_buf                    pbuf   = { 0 };
1986         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1987         struct lu_buf                    lov_buf;
1988         struct lustre_handle             lh     = { 0 };
1989         struct linkea_data               ldata  = { 0 };
1990         struct lu_buf                    linkea_buf;
1991         const struct lu_name            *pname;
1992         int                              size   = 0;
1993         int                              idx    = 0;
1994         int                              rc     = 0;
1995         ENTRY;
1996
1997         /* Create .lustre/lost+found/MDTxxxx when needed. */
1998         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1999                 rc = lfsck_create_lpf(env, lfsck);
2000                 if (rc != 0)
2001                         GOTO(log, rc);
2002         }
2003
2004         if (fid_is_zero(pfid)) {
2005                 struct filter_fid *ff = &info->lti_new_pfid;
2006
2007                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
2008                 if (rc != 0)
2009                         RETURN(rc);
2010
2011                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
2012                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
2013                 /* Currently, the filter_fid::ff_parent::f_ver is not the
2014                  * real parent MDT-object's FID::f_ver, instead it is the
2015                  * OST-object index in its parent MDT-object's layout EA. */
2016                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
2017                 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
2018                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
2019                 if (IS_ERR(cobj))
2020                         GOTO(log, rc = PTR_ERR(cobj));
2021         }
2022
2023         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
2024         if (IS_ERR(pobj))
2025                 GOTO(put, rc = PTR_ERR(pobj));
2026
2027         LASSERT(infix != NULL);
2028         LASSERT(type != NULL);
2029
2030         do {
2031                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
2032                          type, idx++);
2033                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2034                                (const struct dt_key *)name, BYPASS_CAPA);
2035                 if (rc != 0 && rc != -ENOENT)
2036                         GOTO(put, rc);
2037         } while (rc == 0);
2038
2039         rc = linkea_data_new(&ldata,
2040                              &lfsck_env_info(env)->lti_linkea_buf);
2041         if (rc != 0)
2042                 GOTO(put, rc);
2043
2044         pname = lfsck_name_get_const(env, name, strlen(name));
2045         rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
2046         if (rc != 0)
2047                 GOTO(put, rc);
2048
2049         memset(la, 0, sizeof(*la));
2050         la->la_uid = rec->lor_uid;
2051         la->la_gid = rec->lor_gid;
2052         la->la_mode = S_IFREG | S_IRUSR;
2053         la->la_valid = LA_MODE | LA_UID | LA_GID;
2054
2055         memset(dof, 0, sizeof(*dof));
2056         dof->dof_type = dt_mode_to_dft(S_IFREG);
2057
2058         size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2059         if (ea_buf->lb_len < size) {
2060                 lu_buf_realloc(ea_buf, size);
2061                 if (ea_buf->lb_buf == NULL)
2062                         GOTO(put, rc = -ENOMEM);
2063         }
2064
2065         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
2066          *
2067          * XXX: Currently, we do not grab the PDO lock as normal create cases,
2068          *      because creating MDT-object for orphan OST-object is rare, we
2069          *      do not much care about the performance. It can be improved in
2070          *      the future when needed. */
2071         rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
2072                                MDS_INODELOCK_UPDATE);
2073         if (rc != 0)
2074                 GOTO(put, rc);
2075
2076         th = dt_trans_create(env, next);
2077         if (IS_ERR(th))
2078                 GOTO(unlock, rc = PTR_ERR(th));
2079
2080         /* 1a. Update OST-object's parent information remotely.
2081          *
2082          * If other subsequent modifications failed, then next LFSCK scanning
2083          * will process the OST-object as orphan again with known parent FID. */
2084         if (cobj != NULL) {
2085                 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
2086                                           0, th);
2087                 if (rc != 0)
2088                         GOTO(stop, rc);
2089         }
2090
2091         /* 2a. Create the MDT-object locally. */
2092         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2093         if (rc != 0)
2094                 GOTO(stop, rc);
2095
2096         /* 3a. Add layout EA for the MDT-object. */
2097         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
2098         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
2099                                   LU_XATTR_CREATE, th);
2100         if (rc != 0)
2101                 GOTO(stop, rc);
2102
2103         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2104         dtrec->rec_fid = pfid;
2105         dtrec->rec_type = S_IFREG;
2106         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
2107                                (const struct dt_rec *)dtrec,
2108                                (const struct dt_key *)name, th);
2109         if (rc != 0)
2110                 GOTO(stop, rc);
2111
2112         /* 5a. insert linkEA for parent. */
2113         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2114                        ldata.ld_leh->leh_len);
2115         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
2116                                   XATTR_NAME_LINK, 0, th);
2117         if (rc != 0)
2118                 GOTO(stop, rc);
2119
2120         rc = dt_trans_start(env, next, th);
2121         if (rc != 0)
2122                 GOTO(stop, rc);
2123
2124         /* 1b. Update OST-object's parent information remotely. */
2125         if (cobj != NULL) {
2126                 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
2127                                   BYPASS_CAPA);
2128                 if (rc != 0)
2129                         GOTO(stop, rc);
2130         }
2131
2132         dt_write_lock(env, pobj, 0);
2133         /* 2b. Create the MDT-object locally. */
2134         rc = dt_create(env, pobj, la, NULL, dof, th);
2135         if (rc == 0)
2136                 /* 3b. Add layout EA for the MDT-object. */
2137                 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
2138                                                &lov_buf, LU_XATTR_CREATE,
2139                                                ltd->ltd_index, ea_off, false);
2140         dt_write_unlock(env, pobj);
2141         if (rc < 0)
2142                 GOTO(stop, rc);
2143
2144         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2145         rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
2146                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2147         if (rc != 0)
2148                 GOTO(stop, rc);
2149
2150         /* 5b. insert linkEA for parent. */
2151         rc = dt_xattr_set(env, pobj, &linkea_buf,
2152                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2153
2154         GOTO(stop, rc);
2155
2156 stop:
2157         dt_trans_stop(env, next, th);
2158
2159 unlock:
2160         lfsck_layout_unlock(&lh);
2161
2162 put:
2163         if (cobj != NULL && !IS_ERR(cobj))
2164                 lu_object_put(env, &cobj->do_lu);
2165         if (pobj != NULL && !IS_ERR(pobj))
2166                 lu_object_put(env, &pobj->do_lu);
2167
2168 log:
2169         if (rc < 0)
2170                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2171                        "recreate the lost MDT-object: parent "DFID
2172                        ", child "DFID", OST-index %u, stripe-index %u, "
2173                        "infix %s, type %s: rc = %d\n",
2174                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2175                        ltd->ltd_index, ea_off, infix, type, rc);
2176
2177         return rc >= 0 ? 1 : rc;
2178 }
2179
2180 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2181                                                    struct lfsck_component *com,
2182                                                    const struct lu_fid *fid,
2183                                                    __u32 index)
2184 {
2185         struct lfsck_thread_info *info  = lfsck_env_info(env);
2186         struct lfsck_request     *lr    = &info->lti_lr;
2187         struct lfsck_instance    *lfsck = com->lc_lfsck;
2188         struct lfsck_tgt_desc    *ltd;
2189         struct ptlrpc_request    *req;
2190         struct lfsck_request     *tmp;
2191         struct obd_export        *exp;
2192         int                       rc    = 0;
2193         ENTRY;
2194
2195         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2196         if (unlikely(ltd == NULL))
2197                 RETURN(-ENXIO);
2198
2199         exp = ltd->ltd_exp;
2200         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2201                 GOTO(put, rc = -EOPNOTSUPP);
2202
2203         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2204         if (req == NULL)
2205                 GOTO(put, rc = -ENOMEM);
2206
2207         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2208         if (rc != 0) {
2209                 ptlrpc_request_free(req);
2210
2211                 GOTO(put, rc);
2212         }
2213
2214         memset(lr, 0, sizeof(*lr));
2215         lr->lr_event = LE_CONDITIONAL_DESTROY;
2216         lr->lr_active = LFSCK_TYPE_LAYOUT;
2217         lr->lr_fid = *fid;
2218
2219         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2220         *tmp = *lr;
2221         ptlrpc_request_set_replen(req);
2222
2223         rc = ptlrpc_queue_wait(req);
2224         ptlrpc_req_finished(req);
2225
2226         GOTO(put, rc);
2227
2228 put:
2229         lfsck_tgt_put(ltd);
2230
2231         return rc;
2232 }
2233
2234 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2235                                                   struct lfsck_component *com,
2236                                                   struct lfsck_request *lr)
2237 {
2238         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2239         struct lu_attr                  *la     = &info->lti_la;
2240         ldlm_policy_data_t              *policy = &info->lti_policy;
2241         struct ldlm_res_id              *resid  = &info->lti_resid;
2242         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2243         struct dt_device                *dev    = lfsck->li_bottom;
2244         struct lu_fid                   *fid    = &lr->lr_fid;
2245         struct dt_object                *obj;
2246         struct thandle                  *th     = NULL;
2247         struct lustre_handle             lh     = { 0 };
2248         __u64                            flags  = 0;
2249         int                              rc     = 0;
2250         ENTRY;
2251
2252         obj = lfsck_object_find_by_dev(env, dev, fid);
2253         if (IS_ERR(obj))
2254                 RETURN(PTR_ERR(obj));
2255
2256         dt_read_lock(env, obj, 0);
2257         if (dt_object_exists(obj) == 0 ||
2258             lfsck_is_dead_obj(obj)) {
2259                 dt_read_unlock(env, obj);
2260
2261                 GOTO(put, rc = -ENOENT);
2262         }
2263
2264         /* Get obj's attr without lock firstly. */
2265         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2266         dt_read_unlock(env, obj);
2267         if (rc != 0)
2268                 GOTO(put, rc);
2269
2270         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2271                 GOTO(put, rc = -ETXTBSY);
2272
2273         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2274         LASSERT(lfsck->li_namespace != NULL);
2275
2276         memset(policy, 0, sizeof(*policy));
2277         policy->l_extent.end = OBD_OBJECT_EOF;
2278         ost_fid_build_resid(fid, resid);
2279         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2280                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2281                                     ldlm_completion_ast, NULL, NULL, 0,
2282                                     LVB_T_NONE, NULL, &lh);
2283         if (rc != ELDLM_OK)
2284                 GOTO(put, rc = -EIO);
2285
2286         dt_write_lock(env, obj, 0);
2287         /* Get obj's attr within lock again. */
2288         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2289         if (rc != 0)
2290                 GOTO(unlock, rc);
2291
2292         if (la->la_ctime != 0)
2293                 GOTO(unlock, rc = -ETXTBSY);
2294
2295         th = dt_trans_create(env, dev);
2296         if (IS_ERR(th))
2297                 GOTO(unlock, rc = PTR_ERR(th));
2298
2299         rc = dt_declare_ref_del(env, obj, th);
2300         if (rc != 0)
2301                 GOTO(stop, rc);
2302
2303         rc = dt_declare_destroy(env, obj, th);
2304         if (rc != 0)
2305                 GOTO(stop, rc);
2306
2307         rc = dt_trans_start_local(env, dev, th);
2308         if (rc != 0)
2309                 GOTO(stop, rc);
2310
2311         rc = dt_ref_del(env, obj, th);
2312         if (rc != 0)
2313                 GOTO(stop, rc);
2314
2315         rc = dt_destroy(env, obj, th);
2316         if (rc == 0)
2317                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2318                        "OST-object "DFID" that was created for reparing "
2319                        "dangling referenced case. But the original missed "
2320                        "OST-object is found now.\n",
2321                        lfsck_lfsck2name(lfsck), PFID(fid));
2322
2323         GOTO(stop, rc);
2324
2325 stop:
2326         dt_trans_stop(env, dev, th);
2327
2328 unlock:
2329         dt_write_unlock(env, obj);
2330         ldlm_lock_decref(&lh, LCK_EX);
2331
2332 put:
2333         lu_object_put(env, &obj->do_lu);
2334
2335         return rc;
2336 }
2337
2338 /**
2339  * Some OST-object has occupied the specified layout EA slot.
2340  * Such OST-object may be generated by the LFSCK when repair
2341  * dangling referenced MDT-object, which can be indicated by
2342  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2343  * is true and such OST-object has not been modified yet, we
2344  * will replace it with the orphan OST-object; otherwise the
2345  * LFSCK will create new MDT-object to reference the orphan.
2346  *
2347  * \retval       +1: repaired
2348  * \retval        0: did nothing
2349  * \retval      -ve: on error
2350  */
2351 static int lfsck_layout_conflict_create(const struct lu_env *env,
2352                                         struct lfsck_component *com,
2353                                         struct lfsck_tgt_desc *ltd,
2354                                         struct lu_orphan_rec *rec,
2355                                         struct dt_object *parent,
2356                                         struct lu_fid *cfid,
2357                                         struct lu_buf *ea_buf,
2358                                         struct lov_ost_data_v1 *slot,
2359                                         __u32 ea_off)
2360 {
2361         struct lfsck_thread_info *info          = lfsck_env_info(env);
2362         struct lu_fid            *cfid2         = &info->lti_fid2;
2363         struct ost_id            *oi            = &info->lti_oi;
2364         char                     *infix         = info->lti_tmpbuf;
2365         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2366         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2367         struct thandle           *th            = NULL;
2368         struct lustre_handle      lh            = { 0 };
2369         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2370         int                       rc            = 0;
2371         ENTRY;
2372
2373         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2374         rc = ostid_to_fid(cfid2, oi, ost_idx2);
2375         if (rc != 0)
2376                 GOTO(out, rc);
2377
2378         /* Hold layout lock on the parent to prevent others to access. */
2379         rc = lfsck_layout_lock(env, com, parent, &lh,
2380                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2381         if (rc != 0)
2382                 GOTO(out, rc);
2383
2384         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2385
2386         /* If the conflict OST-obejct is not created for fixing dangling
2387          * referenced MDT-object in former LFSCK check/repair, or it has
2388          * been modified by others, then we cannot destroy it. Re-create
2389          * a new MDT-object for the orphan OST-object. */
2390         if (rc == -ETXTBSY) {
2391                 /* No need the layout lock on the original parent. */
2392                 lfsck_layout_unlock(&lh);
2393
2394                 fid_zero(&rec->lor_fid);
2395                 snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2396                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2397                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2398                                                   infix, "C", ea_off);
2399
2400                 RETURN(rc);
2401         }
2402
2403         if (rc != 0 && rc != -ENOENT)
2404                 GOTO(unlock, rc);
2405
2406         th = dt_trans_create(env, dev);
2407         if (IS_ERR(th))
2408                 GOTO(unlock, rc = PTR_ERR(th));
2409
2410         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2411                                   LU_XATTR_REPLACE, th);
2412         if (rc != 0)
2413                 GOTO(stop, rc);
2414
2415         rc = dt_trans_start_local(env, dev, th);
2416         if (rc != 0)
2417                 GOTO(stop, rc);
2418
2419         dt_write_lock(env, parent, 0);
2420         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2421         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2422                                        LU_XATTR_REPLACE, ltd->ltd_index);
2423         dt_write_unlock(env, parent);
2424
2425         GOTO(stop, rc);
2426
2427 stop:
2428         dt_trans_stop(env, dev, th);
2429
2430 unlock:
2431         lfsck_layout_unlock(&lh);
2432
2433 out:
2434         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2435                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2436                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2437                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2438                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2439                ea_off, rc);
2440
2441         return rc >= 0 ? 1 : rc;
2442 }
2443
2444 /**
2445  * \retval       +1: repaired
2446  * \retval        0: did nothing
2447  * \retval      -ve: on error
2448  */
2449 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2450                                        struct lfsck_component *com,
2451                                        struct lfsck_tgt_desc *ltd,
2452                                        struct lu_orphan_rec *rec,
2453                                        struct dt_object *parent,
2454                                        struct lu_fid *cfid,
2455                                        __u32 ost_idx, __u32 ea_off)
2456 {
2457         struct lfsck_thread_info *info          = lfsck_env_info(env);
2458         struct lu_buf            *buf           = &info->lti_big_buf;
2459         struct lu_fid            *fid           = &info->lti_fid2;
2460         struct ost_id            *oi            = &info->lti_oi;
2461         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2462         struct dt_device         *dt            = lfsck->li_bottom;
2463         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2464         struct thandle            *handle       = NULL;
2465         size_t                    lovea_size;
2466         struct lov_mds_md_v1     *lmm;
2467         struct lov_ost_data_v1   *objs;
2468         struct lustre_handle      lh            = { 0 };
2469         __u32                     magic;
2470         int                       fl            = 0;
2471         int                       rc            = 0;
2472         int                       rc1;
2473         int                       i;
2474         __u16                     count;
2475         bool                      locked        = false;
2476         ENTRY;
2477
2478         rc = lfsck_layout_lock(env, com, parent, &lh,
2479                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2480         if (rc != 0) {
2481                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2482                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2483                        "stripe-index %u: rc = %d\n",
2484                        lfsck_lfsck2name(lfsck), PFID(cfid),
2485                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2486
2487                 RETURN(rc);
2488         }
2489
2490 again:
2491         if (locked) {
2492                 dt_write_unlock(env, parent);
2493                 locked = false;
2494         }
2495
2496         if (handle != NULL) {
2497                 dt_trans_stop(env, dt, handle);
2498                 handle = NULL;
2499         }
2500
2501         if (rc < 0)
2502                 GOTO(unlock_layout, rc);
2503
2504         lovea_size = rc;
2505         if (buf->lb_len < lovea_size) {
2506                 lu_buf_realloc(buf, lovea_size);
2507                 if (buf->lb_buf == NULL)
2508                         GOTO(unlock_layout, rc = -ENOMEM);
2509         }
2510
2511         if (!(bk->lb_param & LPF_DRYRUN)) {
2512                 handle = dt_trans_create(env, dt);
2513                 if (IS_ERR(handle))
2514                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2515
2516                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2517                                           fl, handle);
2518                 if (rc != 0)
2519                         GOTO(stop, rc);
2520
2521                 rc = dt_trans_start_local(env, dt, handle);
2522                 if (rc != 0)
2523                         GOTO(stop, rc);
2524         }
2525
2526         dt_write_lock(env, parent, 0);
2527         locked = true;
2528         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2529         if (rc == -ERANGE) {
2530                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2531                                   BYPASS_CAPA);
2532                 LASSERT(rc != 0);
2533                 goto again;
2534         } else if (rc == -ENODATA || rc == 0) {
2535                 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2536                 /* If the declared is not big enough, re-try. */
2537                 if (buf->lb_len < lovea_size) {
2538                         rc = lovea_size;
2539                         goto again;
2540                 }
2541                 fl = LU_XATTR_CREATE;
2542         } else if (rc < 0) {
2543                 GOTO(unlock_parent, rc);
2544         } else if (unlikely(buf->lb_len == 0)) {
2545                 goto again;
2546         } else {
2547                 fl = LU_XATTR_REPLACE;
2548                 lovea_size = rc;
2549         }
2550
2551         if (fl == LU_XATTR_CREATE) {
2552                 if (bk->lb_param & LPF_DRYRUN)
2553                         GOTO(unlock_parent, rc = 1);
2554
2555                 LASSERT(buf->lb_len >= lovea_size);
2556
2557                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2558                                                buf, fl, ost_idx, ea_off, false);
2559
2560                 GOTO(unlock_parent, rc);
2561         }
2562
2563         lmm = buf->lb_buf;
2564         rc1 = lfsck_layout_verify_header(lmm);
2565
2566         /* If the LOV EA crashed, the rebuild it. */
2567         if (rc1 == -EINVAL) {
2568                 if (bk->lb_param & LPF_DRYRUN)
2569                         GOTO(unlock_parent, rc = 1);
2570
2571                 LASSERT(buf->lb_len >= lovea_size);
2572
2573                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2574                                                buf, fl, ost_idx, ea_off, true);
2575
2576                 GOTO(unlock_parent, rc);
2577         }
2578
2579         /* For other unknown magic/pattern, keep the current LOV EA. */
2580         if (rc1 != 0)
2581                 GOTO(unlock_parent, rc = rc1);
2582
2583         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2584          * been verified in lfsck_layout_verify_header() already. If some
2585          * new magic introduced in the future, then layout LFSCK needs to
2586          * be updated also. */
2587         magic = le32_to_cpu(lmm->lmm_magic);
2588         if (magic == LOV_MAGIC_V1) {
2589                 objs = &lmm->lmm_objects[0];
2590         } else {
2591                 LASSERT(magic == LOV_MAGIC_V3);
2592                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2593         }
2594
2595         count = le16_to_cpu(lmm->lmm_stripe_count);
2596         if (count == 0)
2597                 GOTO(unlock_parent, rc = -EINVAL);
2598         LASSERT(count > 0);
2599
2600         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2601         if (count <= ea_off) {
2602                 if (bk->lb_param & LPF_DRYRUN)
2603                         GOTO(unlock_parent, rc = 1);
2604
2605                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2606                 /* If the declared is not big enough, re-try. */
2607                 if (buf->lb_len < lovea_size) {
2608                         rc = lovea_size;
2609                         goto again;
2610                 }
2611
2612                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2613                                                buf, fl, ost_idx, ea_off, false);
2614
2615                 GOTO(unlock_parent, rc);
2616         }
2617
2618         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2619
2620         for (i = 0; i < count; i++, objs++) {
2621                 /* The MDT-object was created via lfsck_layout_recover_create()
2622                  * by others before, and we fill the dummy layout EA. */
2623                 if (lovea_slot_is_dummy(objs)) {
2624                         if (i != ea_off)
2625                                 continue;
2626
2627                         if (bk->lb_param & LPF_DRYRUN)
2628                                 GOTO(unlock_parent, rc = 1);
2629
2630                         lmm->lmm_layout_gen =
2631                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2632                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2633                                                        cfid, buf, objs, fl,
2634                                                        ost_idx);
2635
2636                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2637                                "dummy layout slot for "DFID": parent "DFID
2638                                ", OST-index %u, stripe-index %u: rc = %d\n",
2639                                lfsck_lfsck2name(lfsck), PFID(cfid),
2640                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2641
2642                         GOTO(unlock_parent, rc);
2643                 }
2644
2645                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2646                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2647                 if (rc != 0) {
2648                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2649                                "invalid layout EA at the slot %d, index %u\n",
2650                                lfsck_lfsck2name(lfsck),
2651                                PFID(lfsck_dto2fid(parent)), i,
2652                                le32_to_cpu(objs->l_ost_idx));
2653
2654                         GOTO(unlock_parent, rc);
2655                 }
2656
2657                 /* It should be rare case, the slot is there, but the LFSCK
2658                  * does not handle it during the first-phase cycle scanning. */
2659                 if (unlikely(lu_fid_eq(fid, cfid))) {
2660                         if (i == ea_off) {
2661                                 GOTO(unlock_parent, rc = 0);
2662                         } else {
2663                                 /* Rare case that the OST-object index
2664                                  * does not match the parent MDT-object
2665                                  * layout EA. We trust the later one. */
2666                                 if (bk->lb_param & LPF_DRYRUN)
2667                                         GOTO(unlock_parent, rc = 1);
2668
2669                                 dt_write_unlock(env, parent);
2670                                 if (handle != NULL)
2671                                         dt_trans_stop(env, dt, handle);
2672                                 lfsck_layout_unlock(&lh);
2673                                 rc = lfsck_layout_update_pfid(env, com, parent,
2674                                                         cfid, ltd->ltd_tgt, i);
2675
2676                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2677                                        "updated OST-object's pfid for "DFID
2678                                        ": parent "DFID", OST-index %u, "
2679                                        "stripe-index %u: rc = %d\n",
2680                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2681                                        PFID(lfsck_dto2fid(parent)),
2682                                        ltd->ltd_index, i, rc);
2683
2684                                 RETURN(rc);
2685                         }
2686                 }
2687         }
2688
2689         /* The MDT-object exists, but related layout EA slot is occupied
2690          * by others. */
2691         if (bk->lb_param & LPF_DRYRUN)
2692                 GOTO(unlock_parent, rc = 1);
2693
2694         dt_write_unlock(env, parent);
2695         if (handle != NULL)
2696                 dt_trans_stop(env, dt, handle);
2697         lfsck_layout_unlock(&lh);
2698         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2699                 objs = &lmm->lmm_objects[ea_off];
2700         else
2701                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2702         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2703                                           buf, objs, ea_off);
2704
2705         RETURN(rc);
2706
2707 unlock_parent:
2708         if (locked)
2709                 dt_write_unlock(env, parent);
2710
2711 stop:
2712         if (handle != NULL)
2713                 dt_trans_stop(env, dt, handle);
2714
2715 unlock_layout:
2716         lfsck_layout_unlock(&lh);
2717
2718         return rc;
2719 }
2720
2721 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2722                                         struct lfsck_component *com,
2723                                         struct lfsck_tgt_desc *ltd,
2724                                         struct lu_orphan_rec *rec,
2725                                         struct lu_fid *cfid)
2726 {
2727         struct lfsck_layout     *lo     = com->lc_file_ram;
2728         struct lu_fid           *pfid   = &rec->lor_fid;
2729         struct dt_object        *parent = NULL;
2730         __u32                    ea_off = pfid->f_stripe_idx;
2731         int                      rc     = 0;
2732         ENTRY;
2733
2734         if (!fid_is_sane(cfid))
2735                 GOTO(out, rc = -EINVAL);
2736
2737         if (fid_is_zero(pfid)) {
2738                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2739                                                   "", "N", ea_off);
2740                 GOTO(out, rc);
2741         }
2742
2743         pfid->f_ver = 0;
2744         if (!fid_is_sane(pfid))
2745                 GOTO(out, rc = -EINVAL);
2746
2747         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2748         if (IS_ERR(parent))
2749                 GOTO(out, rc = PTR_ERR(parent));
2750
2751         if (unlikely(dt_object_remote(parent) != 0))
2752                 GOTO(put, rc = -EXDEV);
2753
2754         if (dt_object_exists(parent) == 0) {
2755                 lu_object_put(env, &parent->do_lu);
2756                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2757                                                   "", "R", ea_off);
2758                 GOTO(out, rc);
2759         }
2760
2761         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2762                 GOTO(put, rc = -EISDIR);
2763
2764         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2765                                          ltd->ltd_index, ea_off);
2766
2767         GOTO(put, rc);
2768
2769 put:
2770         if (rc <= 0)
2771                 lu_object_put(env, &parent->do_lu);
2772         else
2773                 /* The layout EA is changed, need to be reloaded next time. */
2774                 lu_object_put_nocache(env, &parent->do_lu);
2775
2776 out:
2777         down_write(&com->lc_sem);
2778         com->lc_new_scanned++;
2779         com->lc_new_checked++;
2780         if (rc > 0) {
2781                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2782                 rc = 0;
2783         } else if (rc < 0) {
2784                 lo->ll_objs_failed_phase2++;
2785         }
2786         up_write(&com->lc_sem);
2787
2788         return rc;
2789 }
2790
2791 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2792                                     struct lfsck_component *com,
2793                                     struct lfsck_tgt_desc *ltd)
2794 {
2795         struct lfsck_layout             *lo     = com->lc_file_ram;
2796         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2797         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2798         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2799         struct ost_id                   *oi     = &info->lti_oi;
2800         struct lu_fid                   *fid    = &info->lti_fid;
2801         struct dt_object                *obj;
2802         const struct dt_it_ops          *iops;
2803         struct dt_it                    *di;
2804         int                              rc     = 0;
2805         ENTRY;
2806
2807         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2808                "scanning for OST%04x\n",
2809                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2810
2811         ostid_set_seq(oi, FID_SEQ_IDIF);
2812         ostid_set_id(oi, 0);
2813         rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2814         if (rc != 0)
2815                 GOTO(log, rc);
2816
2817         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2818         if (unlikely(IS_ERR(obj)))
2819                 GOTO(log, rc = PTR_ERR(obj));
2820
2821         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2822         if (rc != 0)
2823                 GOTO(put, rc);
2824
2825         iops = &obj->do_index_ops->dio_it;
2826         di = iops->init(env, obj, 0, BYPASS_CAPA);
2827         if (IS_ERR(di))
2828                 GOTO(put, rc = PTR_ERR(di));
2829
2830         rc = iops->load(env, di, 0);
2831         if (rc == -ESRCH) {
2832                 /* -ESRCH means that the orphan OST-objects rbtree has been
2833                  * cleanup because of the OSS server restart or other errors. */
2834                 lo->ll_flags |= LF_INCOMPLETE;
2835                 GOTO(fini, rc);
2836         }
2837
2838         if (rc == 0)
2839                 rc = iops->next(env, di);
2840         else if (rc > 0)
2841                 rc = 0;
2842
2843         if (rc < 0)
2844                 GOTO(fini, rc);
2845
2846         if (rc > 0)
2847                 GOTO(fini, rc = 0);
2848
2849         do {
2850                 struct dt_key           *key;
2851                 struct lu_orphan_rec    *rec = &info->lti_rec;
2852
2853                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2854                     cfs_fail_val > 0) {
2855                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2856                         struct l_wait_info       lwi;
2857
2858                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2859                                           NULL, NULL);
2860                         l_wait_event(thread->t_ctl_waitq,
2861                                      !thread_is_running(thread),
2862                                      &lwi);
2863                 }
2864
2865                 key = iops->key(env, di);
2866                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2867                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2868                 if (rc == 0)
2869                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2870                                         &com->lc_fid_latest_scanned_phase2);
2871                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2872                         GOTO(fini, rc);
2873
2874                 lfsck_control_speed_by_self(com);
2875                 do {
2876                         rc = iops->next(env, di);
2877                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2878         } while (rc == 0);
2879
2880         GOTO(fini, rc);
2881
2882 fini:
2883         iops->put(env, di);
2884         iops->fini(env, di);
2885 put:
2886         lu_object_put(env, &obj->do_lu);
2887
2888 log:
2889         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2890                "scanning for OST%04x: rc = %d\n",
2891                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2892
2893         return rc > 0 ? 0 : rc;
2894 }
2895
2896 /* For the MDT-object with dangling reference, we need to repare the
2897  * inconsistency according to the LFSCK sponsor's requirement:
2898  *
2899  * 1) Keep the inconsistency there and report the inconsistency case,
2900  *    then give the chance to the application to find related issues,
2901  *    and the users can make the decision about how to handle it with
2902  *    more human knownledge. (by default)
2903  *
2904  * 2) Re-create the missed OST-object with the FID/owner information. */
2905 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2906                                         struct lfsck_component *com,
2907                                         struct lfsck_layout_req *llr,
2908                                         const struct lu_attr *pla)
2909 {
2910         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2911         struct filter_fid               *pfid   = &info->lti_new_pfid;
2912         struct dt_allocation_hint       *hint   = &info->lti_hint;
2913         struct lu_attr                  *cla    = &info->lti_la2;
2914         struct dt_object                *parent = llr->llr_parent->llo_obj;
2915         struct dt_object                *child  = llr->llr_child;
2916         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2917         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2918         struct thandle                  *handle;
2919         struct lu_buf                   *buf;
2920         struct lustre_handle             lh     = { 0 };
2921         int                              rc;
2922         bool                             create;
2923         ENTRY;
2924
2925         if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2926                 create = true;
2927         else
2928                 create = false;
2929
2930         if (!create)
2931                 GOTO(log, rc = 1);
2932
2933         memset(cla, 0, sizeof(*cla));
2934         cla->la_uid = pla->la_uid;
2935         cla->la_gid = pla->la_gid;
2936         cla->la_mode = S_IFREG | 0666;
2937         cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2938                         LA_ATIME | LA_MTIME | LA_CTIME;
2939
2940         rc = lfsck_layout_lock(env, com, parent, &lh,
2941                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2942         if (rc != 0)
2943                 GOTO(log, rc);
2944
2945         handle = dt_trans_create(env, dev);
2946         if (IS_ERR(handle))
2947                 GOTO(unlock1, rc = PTR_ERR(handle));
2948
2949         hint->dah_parent = NULL;
2950         hint->dah_mode = 0;
2951         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2952         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2953         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2954          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2955          * parent MDT-object's layout EA. */
2956         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2957         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2958
2959         rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2960         if (rc != 0)
2961                 GOTO(stop, rc);
2962
2963         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2964                                   LU_XATTR_CREATE, handle);
2965         if (rc != 0)
2966                 GOTO(stop, rc);
2967
2968         rc = dt_trans_start(env, dev, handle);
2969         if (rc != 0)
2970                 GOTO(stop, rc);
2971
2972         dt_read_lock(env, parent, 0);
2973         if (unlikely(lfsck_is_dead_obj(parent)))
2974                 GOTO(unlock2, rc = 1);
2975
2976         rc = dt_create(env, child, cla, hint, NULL, handle);
2977         if (rc != 0)
2978                 GOTO(unlock2, rc);
2979
2980         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2981                           handle, BYPASS_CAPA);
2982
2983         GOTO(unlock2, rc);
2984
2985 unlock2:
2986         dt_read_unlock(env, parent);
2987
2988 stop:
2989         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2990
2991 unlock1:
2992         lfsck_layout_unlock(&lh);
2993
2994 log:
2995         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2996                "reference for: parent "DFID", child "DFID", OST-index %u, "
2997                "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2998                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2999                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
3000                llr->llr_lov_idx, pla->la_uid, pla->la_gid,
3001                create ? "Create the lost OST-object as required" :
3002                         "Keep the MDT-object there by default", rc);
3003
3004         return rc;
3005 }
3006
3007 /* If the OST-object does not recognize the MDT-object as its parent, and
3008  * there is no other MDT-object claims as its parent, then just trust the
3009  * given MDT-object as its parent. So update the OST-object filter_fid. */
3010 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
3011                                               struct lfsck_component *com,
3012                                               struct lfsck_layout_req *llr,
3013                                               const struct lu_attr *pla)
3014 {
3015         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3016         struct filter_fid               *pfid   = &info->lti_new_pfid;
3017         struct lu_attr                  *tla    = &info->lti_la3;
3018         struct dt_object                *parent = llr->llr_parent->llo_obj;
3019         struct dt_object                *child  = llr->llr_child;
3020         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3021         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
3022         struct thandle                  *handle;
3023         struct lu_buf                   *buf;
3024         struct lustre_handle             lh     = { 0 };
3025         int                              rc;
3026         ENTRY;
3027
3028         rc = lfsck_layout_lock(env, com, parent, &lh,
3029                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3030         if (rc != 0)
3031                 GOTO(log, rc);
3032
3033         handle = dt_trans_create(env, dev);
3034         if (IS_ERR(handle))
3035                 GOTO(unlock1, rc = PTR_ERR(handle));
3036
3037         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
3038         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
3039         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
3040          * MDT-object's FID::f_ver, instead it is the OST-object index in its
3041          * parent MDT-object's layout EA. */
3042         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
3043         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
3044
3045         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
3046         if (rc != 0)
3047                 GOTO(stop, rc);
3048
3049         tla->la_valid = LA_UID | LA_GID;
3050         tla->la_uid = pla->la_uid;
3051         tla->la_gid = pla->la_gid;
3052         rc = dt_declare_attr_set(env, child, tla, handle);
3053         if (rc != 0)
3054                 GOTO(stop, rc);
3055
3056         rc = dt_trans_start(env, dev, handle);
3057         if (rc != 0)
3058                 GOTO(stop, rc);
3059
3060         dt_write_lock(env, parent, 0);
3061         if (unlikely(lfsck_is_dead_obj(parent)))
3062                 GOTO(unlock2, rc = 1);
3063
3064         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
3065                           BYPASS_CAPA);
3066         if (rc != 0)
3067                 GOTO(unlock2, rc);
3068
3069         /* Get the latest parent's owner. */
3070         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3071         if (rc != 0)
3072                 GOTO(unlock2, rc);
3073
3074         tla->la_valid = LA_UID | LA_GID;
3075         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3076
3077         GOTO(unlock2, rc);
3078
3079 unlock2:
3080         dt_write_unlock(env, parent);
3081
3082 stop:
3083         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3084
3085 unlock1:
3086         lfsck_layout_unlock(&lh);
3087
3088 log:
3089         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
3090                "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
3091                "stripe-index %u, owner %u/%u: rc = %d\n",
3092                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3093                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3094                pla->la_uid, pla->la_gid, rc);
3095
3096         return rc;
3097 }
3098
3099 /* If there are more than one MDT-objects claim as the OST-object's parent,
3100  * and the OST-object only recognizes one of them, then we need to generate
3101  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
3102 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
3103                                                    struct lfsck_component *com,
3104                                                    struct lfsck_layout_req *llr,
3105                                                    struct lu_attr *la,
3106                                                    struct lu_buf *buf)
3107 {
3108         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3109         struct dt_allocation_hint       *hint   = &info->lti_hint;
3110         struct dt_object_format         *dof    = &info->lti_dof;
3111         struct dt_device                *pdev   = com->lc_lfsck->li_next;
3112         struct ost_id                   *oi     = &info->lti_oi;
3113         struct dt_object                *parent = llr->llr_parent->llo_obj;
3114         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
3115         struct dt_object                *child  = NULL;
3116         struct lu_device                *d      = &cdev->dd_lu_dev;
3117         struct lu_object                *o      = NULL;
3118         struct thandle                  *handle;
3119         struct lov_mds_md_v1            *lmm;
3120         struct lov_ost_data_v1          *objs;
3121         struct lustre_handle             lh     = { 0 };
3122         struct lu_buf                    ea_buf;
3123         __u32                            magic;
3124         int                              rc;
3125         ENTRY;
3126
3127         rc = lfsck_layout_lock(env, com, parent, &lh,
3128                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3129         if (rc != 0)
3130                 GOTO(log, rc);
3131
3132         handle = dt_trans_create(env, pdev);
3133         if (IS_ERR(handle))
3134                 GOTO(unlock1, rc = PTR_ERR(handle));
3135
3136         o = lu_object_anon(env, d, NULL);
3137         if (IS_ERR(o))
3138                 GOTO(stop, rc = PTR_ERR(o));
3139
3140         child = container_of(o, struct dt_object, do_lu);
3141         o = lu_object_locate(o->lo_header, d->ld_type);
3142         if (unlikely(o == NULL))
3143                 GOTO(stop, rc = -EINVAL);
3144
3145         child = container_of(o, struct dt_object, do_lu);
3146         la->la_valid = LA_UID | LA_GID;
3147         hint->dah_parent = NULL;
3148         hint->dah_mode = 0;
3149         dof->dof_type = DFT_REGULAR;
3150         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
3151         if (rc != 0)
3152                 GOTO(stop, rc);
3153
3154         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
3155                                   LU_XATTR_REPLACE, handle);
3156         if (rc != 0)
3157                 GOTO(stop, rc);
3158
3159         rc = dt_trans_start(env, pdev, handle);
3160         if (rc != 0)
3161                 GOTO(stop, rc);
3162
3163         dt_write_lock(env, parent, 0);
3164         if (unlikely(lfsck_is_dead_obj(parent)))
3165                 GOTO(unlock2, rc = 0);
3166
3167         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
3168         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
3169                 GOTO(unlock2, rc = 0);
3170
3171         lmm = buf->lb_buf;
3172         /* Someone change layout during the LFSCK, no need to repair then. */
3173         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
3174                 GOTO(unlock2, rc = 0);
3175
3176         rc = dt_create(env, child, la, hint, dof, handle);
3177         if (rc != 0)
3178                 GOTO(unlock2, rc);
3179
3180         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3181          * been verified in lfsck_layout_verify_header() already. If some
3182          * new magic introduced in the future, then layout LFSCK needs to
3183          * be updated also. */
3184         magic = le32_to_cpu(lmm->lmm_magic);
3185         if (magic == LOV_MAGIC_V1) {
3186                 objs = &lmm->lmm_objects[0];
3187         } else {
3188                 LASSERT(magic == LOV_MAGIC_V3);
3189                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3190         }
3191
3192         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
3193         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
3194         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
3195         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
3196         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
3197         lfsck_buf_init(&ea_buf, lmm,
3198                        lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count),
3199                                        magic));
3200         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV,
3201                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
3202
3203         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
3204
3205 unlock2:
3206         dt_write_unlock(env, parent);
3207
3208 stop:
3209         if (child != NULL)
3210                 lu_object_put(env, &child->do_lu);
3211
3212         dt_trans_stop(env, pdev, handle);
3213
3214 unlock1:
3215         lfsck_layout_unlock(&lh);
3216
3217 log:
3218         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
3219                "references for: parent "DFID", OST-index %u, stripe-index %u, "
3220                "owner %u/%u: rc = %d\n",
3221                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3222                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
3223
3224         return rc;
3225 }
3226
3227 /* If the MDT-object and the OST-object have different owner information,
3228  * then trust the MDT-object, because the normal chown/chgrp handle order
3229  * is from MDT to OST, and it is possible that some chown/chgrp operation
3230  * is partly done. */
3231 static int lfsck_layout_repair_owner(const struct lu_env *env,
3232                                      struct lfsck_component *com,
3233                                      struct lfsck_layout_req *llr,
3234                                      struct lu_attr *pla)
3235 {
3236         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3237         struct lu_attr                  *tla    = &info->lti_la3;
3238         struct dt_object                *parent = llr->llr_parent->llo_obj;
3239         struct dt_object                *child  = llr->llr_child;
3240         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3241         struct thandle                  *handle;
3242         int                              rc;
3243         ENTRY;
3244
3245         handle = dt_trans_create(env, dev);
3246         if (IS_ERR(handle))
3247                 GOTO(log, rc = PTR_ERR(handle));
3248
3249         tla->la_uid = pla->la_uid;
3250         tla->la_gid = pla->la_gid;
3251         tla->la_valid = LA_UID | LA_GID;
3252         rc = dt_declare_attr_set(env, child, tla, handle);
3253         if (rc != 0)
3254                 GOTO(stop, rc);
3255
3256         rc = dt_trans_start(env, dev, handle);
3257         if (rc != 0)
3258                 GOTO(stop, rc);
3259
3260         /* Use the dt_object lock to serialize with destroy and attr_set. */
3261         dt_read_lock(env, parent, 0);
3262         if (unlikely(lfsck_is_dead_obj(parent)))
3263                 GOTO(unlock, rc = 1);
3264
3265         /* Get the latest parent's owner. */
3266         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3267         if (rc != 0)
3268                 GOTO(unlock, rc);
3269
3270         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3271         if (unlikely(tla->la_uid != pla->la_uid ||
3272                      tla->la_gid != pla->la_gid))
3273                 GOTO(unlock, rc = 1);
3274
3275         tla->la_valid = LA_UID | LA_GID;
3276         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3277
3278         GOTO(unlock, rc);
3279
3280 unlock:
3281         dt_read_unlock(env, parent);
3282
3283 stop:
3284         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3285
3286 log:
3287         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
3288                "file owner for: parent "DFID", child "DFID", OST-index %u, "
3289                "stripe-index %u, owner %u/%u: rc = %d\n",
3290                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3291                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3292                pla->la_uid, pla->la_gid, rc);
3293
3294         return rc;
3295 }
3296
3297 /* Check whether the OST-object correctly back points to the
3298  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3299 static int lfsck_layout_check_parent(const struct lu_env *env,
3300                                      struct lfsck_component *com,
3301                                      struct dt_object *parent,
3302                                      const struct lu_fid *pfid,
3303                                      const struct lu_fid *cfid,
3304                                      const struct lu_attr *pla,
3305                                      const struct lu_attr *cla,
3306                                      struct lfsck_layout_req *llr,
3307                                      struct lu_buf *lov_ea, __u32 idx)
3308 {
3309         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3310         struct lu_buf                   *buf    = &info->lti_big_buf;
3311         struct dt_object                *tobj;
3312         struct lov_mds_md_v1            *lmm;
3313         struct lov_ost_data_v1          *objs;
3314         int                              rc;
3315         int                              i;
3316         __u32                            magic;
3317         __u16                            count;
3318         ENTRY;
3319
3320         if (fid_is_zero(pfid)) {
3321                 /* client never wrote. */
3322                 if (cla->la_size == 0 && cla->la_blocks == 0) {
3323                         if (unlikely(cla->la_uid != pla->la_uid ||
3324                                      cla->la_gid != pla->la_gid))
3325                                 RETURN (LLIT_INCONSISTENT_OWNER);
3326
3327                         RETURN(0);
3328                 }
3329
3330                 RETURN(LLIT_UNMATCHED_PAIR);
3331         }
3332
3333         if (unlikely(!fid_is_sane(pfid)))
3334                 RETURN(LLIT_UNMATCHED_PAIR);
3335
3336         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3337                 if (llr->llr_lov_idx == idx)
3338                         RETURN(0);
3339
3340                 RETURN(LLIT_UNMATCHED_PAIR);
3341         }
3342
3343         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3344         if (IS_ERR(tobj))
3345                 RETURN(PTR_ERR(tobj));
3346
3347         dt_read_lock(env, tobj, 0);
3348         if (dt_object_exists(tobj) == 0 ||
3349             lfsck_is_dead_obj(tobj))
3350                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3351
3352         if (!S_ISREG(lfsck_object_type(tobj)))
3353                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3354
3355         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3356          * remote one on another MDT. Then check whether the given OST-object
3357          * is in such layout. If yes, it is multiple referenced, otherwise it
3358          * is unmatched referenced case. */
3359         rc = lfsck_layout_get_lovea(env, tobj, buf);
3360         if (rc == 0 || rc == -ENOENT)
3361                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3362
3363         if (rc < 0)
3364                 GOTO(out, rc);
3365
3366         lmm = buf->lb_buf;
3367         magic = le32_to_cpu(lmm->lmm_magic);
3368         if (magic == LOV_MAGIC_V1) {
3369                 objs = &lmm->lmm_objects[0];
3370         } else {
3371                 LASSERT(magic == LOV_MAGIC_V3);
3372                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3373         }
3374
3375         count = le16_to_cpu(lmm->lmm_stripe_count);
3376         for (i = 0; i < count; i++, objs++) {
3377                 struct lu_fid           *tfid   = &info->lti_fid2;
3378                 struct ost_id           *oi     = &info->lti_oi;
3379                 __u32                    idx2;
3380
3381                 if (lovea_slot_is_dummy(objs))
3382                         continue;
3383
3384                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3385                 idx2 = le32_to_cpu(objs->l_ost_idx);
3386                 rc = ostid_to_fid(tfid, oi, idx2);
3387                 if (rc != 0) {
3388                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
3389                                "invalid layout EA at the slot %d, index %u\n",
3390                                lfsck_lfsck2name(com->lc_lfsck),
3391                                PFID(pfid), i, idx2);
3392
3393                         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3394                 }
3395
3396                 if (lu_fid_eq(cfid, tfid)) {
3397                         *lov_ea = *buf;
3398
3399                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
3400                 }
3401         }
3402
3403         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3404
3405 out:
3406         dt_read_unlock(env, tobj);
3407         lfsck_object_put(env, tobj);
3408
3409         return rc;
3410 }
3411
3412 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
3413                                              struct lfsck_component *com,
3414                                              struct lfsck_layout_req *llr)
3415 {
3416         struct lfsck_layout                  *lo     = com->lc_file_ram;
3417         struct lfsck_thread_info             *info   = lfsck_env_info(env);
3418         struct filter_fid_old                *pea    = &info->lti_old_pfid;
3419         struct lu_fid                        *pfid   = &info->lti_fid;
3420         struct lu_buf                         buf    = { 0 };
3421         struct dt_object                     *parent = llr->llr_parent->llo_obj;
3422         struct dt_object                     *child  = llr->llr_child;
3423         struct lu_attr                       *pla    = &info->lti_la;
3424         struct lu_attr                       *cla    = &info->lti_la2;
3425         struct lfsck_instance                *lfsck  = com->lc_lfsck;
3426         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
3427         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
3428         __u32                                 idx    = 0;
3429         int                                   rc;
3430         ENTRY;
3431
3432         if (unlikely(lfsck_is_dead_obj(parent)))
3433                 RETURN(0);
3434
3435         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3436         if (rc != 0)
3437                 GOTO(out, rc);
3438
3439         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3440         if (rc == -ENOENT) {
3441                 if (unlikely(lfsck_is_dead_obj(parent)))
3442                         RETURN(0);
3443
3444                 type = LLIT_DANGLING;
3445                 goto repair;
3446         }
3447
3448         if (rc != 0)
3449                 GOTO(out, rc);
3450
3451         lfsck_buf_init(&buf, pea, sizeof(struct filter_fid_old));
3452         rc = dt_xattr_get(env, child, &buf, XATTR_NAME_FID, BYPASS_CAPA);
3453         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3454                      rc != sizeof(struct filter_fid))) {
3455                 type = LLIT_UNMATCHED_PAIR;
3456                 goto repair;
3457         }
3458
3459         if (rc < 0 && rc != -ENODATA)
3460                 GOTO(out, rc);
3461
3462         if (rc == -ENODATA) {
3463                 fid_zero(pfid);
3464         } else {
3465                 fid_le_to_cpu(pfid, &pea->ff_parent);
3466                 /* Currently, the filter_fid::ff_parent::f_ver is not the
3467                  * real parent MDT-object's FID::f_ver, instead it is the
3468                  * OST-object index in its parent MDT-object's layout EA. */
3469                 idx = pfid->f_stripe_idx;
3470                 pfid->f_ver = 0;
3471         }
3472
3473         rc = lfsck_layout_check_parent(env, com, parent, pfid,
3474                                        lu_object_fid(&child->do_lu),
3475                                        pla, cla, llr, &buf, idx);
3476         if (rc > 0) {
3477                 type = rc;
3478                 goto repair;
3479         }
3480
3481         if (rc < 0)
3482                 GOTO(out, rc);
3483
3484         if (unlikely(cla->la_uid != pla->la_uid ||
3485                      cla->la_gid != pla->la_gid)) {
3486                 type = LLIT_INCONSISTENT_OWNER;
3487                 goto repair;
3488         }
3489
3490 repair:
3491         if (bk->lb_param & LPF_DRYRUN) {
3492                 if (type != LLIT_NONE)
3493                         GOTO(out, rc = 1);
3494                 else
3495                         GOTO(out, rc = 0);
3496         }
3497
3498         switch (type) {
3499         case LLIT_DANGLING:
3500                 rc = lfsck_layout_repair_dangling(env, com, llr, pla);
3501                 break;
3502         case LLIT_UNMATCHED_PAIR:
3503                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3504                 break;
3505         case LLIT_MULTIPLE_REFERENCED:
3506                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3507                                                              pla, &buf);
3508                 break;
3509         case LLIT_INCONSISTENT_OWNER:
3510                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3511                 break;
3512         default:
3513                 rc = 0;
3514                 break;
3515         }
3516
3517         GOTO(out, rc);
3518
3519 out:
3520         down_write(&com->lc_sem);
3521         if (rc < 0) {
3522                 struct lfsck_layout_master_data *llmd = com->lc_data;
3523
3524                 if (unlikely(llmd->llmd_exit)) {
3525                         rc = 0;
3526                 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3527                            rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3528                            rc == -EHOSTUNREACH) {
3529                         /* If cannot touch the target server,
3530                          * mark the LFSCK as INCOMPLETE. */
3531                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3532                                "talk with OST %x: rc = %d\n",
3533                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3534                         lo->ll_flags |= LF_INCOMPLETE;
3535                         lo->ll_objs_skipped++;
3536                         rc = 0;
3537                 } else {
3538                         lfsck_layout_record_failure(env, lfsck, lo);
3539                 }
3540         } else if (rc > 0) {
3541                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3542                          "unknown type = %d\n", type);
3543
3544                 lo->ll_objs_repaired[type - 1]++;
3545                 if (bk->lb_param & LPF_DRYRUN &&