Whamcloud - gitweb
LU-4788 lfsck: take ldlm lock before modifying visible object
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_fid.h>
43 #include <lustre_lib.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <md_object.h>
47 #include <obd_class.h>
48
49 #include "lfsck_internal.h"
50
51 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
52
53 static const char lfsck_layout_name[] = "lfsck_layout";
54
55 struct lfsck_layout_seq {
56         struct list_head         lls_list;
57         __u64                    lls_seq;
58         __u64                    lls_lastid;
59         __u64                    lls_lastid_known;
60         struct dt_object        *lls_lastid_obj;
61         unsigned int             lls_dirty:1;
62 };
63
64 struct lfsck_layout_slave_target {
65         /* link into lfsck_layout_slave_data::llsd_master_list. */
66         struct list_head        llst_list;
67         /* The position for next record in the rbtree for iteration. */
68         struct lu_fid           llst_fid;
69         /* Dummy hash for iteration against the rbtree. */
70         __u64                   llst_hash;
71         __u64                   llst_gen;
72         atomic_t                llst_ref;
73         __u32                   llst_index;
74 };
75
76 struct lfsck_layout_slave_data {
77         /* list for lfsck_layout_seq */
78         struct list_head         llsd_seq_list;
79
80         /* list for the masters involve layout verification. */
81         struct list_head         llsd_master_list;
82         spinlock_t               llsd_lock;
83         __u64                    llsd_touch_gen;
84         struct dt_object        *llsd_rb_obj;
85         struct rb_root           llsd_rb_root;
86         rwlock_t                 llsd_rb_lock;
87         unsigned int             llsd_rbtree_valid:1;
88 };
89
90 struct lfsck_layout_object {
91         struct dt_object        *llo_obj;
92         struct lu_attr           llo_attr;
93         atomic_t                 llo_ref;
94         __u16                    llo_gen;
95 };
96
97 struct lfsck_layout_req {
98         struct list_head                 llr_list;
99         struct lfsck_layout_object      *llr_parent;
100         struct dt_object                *llr_child;
101         __u32                            llr_ost_idx;
102         __u32                            llr_lov_idx; /* offset in LOV EA */
103 };
104
105 struct lfsck_layout_master_data {
106         spinlock_t              llmd_lock;
107         struct list_head        llmd_req_list;
108
109         /* list for the ost targets involve layout verification. */
110         struct list_head        llmd_ost_list;
111
112         /* list for the ost targets in phase1 scanning. */
113         struct list_head        llmd_ost_phase1_list;
114
115         /* list for the ost targets in phase1 scanning. */
116         struct list_head        llmd_ost_phase2_list;
117
118         /* list for the mdt targets involve layout verification. */
119         struct list_head        llmd_mdt_list;
120
121         /* list for the mdt targets in phase1 scanning. */
122         struct list_head        llmd_mdt_phase1_list;
123
124         /* list for the mdt targets in phase1 scanning. */
125         struct list_head        llmd_mdt_phase2_list;
126
127         struct ptlrpc_thread    llmd_thread;
128         __u32                   llmd_touch_gen;
129         int                     llmd_prefetched;
130         int                     llmd_assistant_status;
131         int                     llmd_post_result;
132         unsigned int            llmd_to_post:1,
133                                 llmd_to_double_scan:1,
134                                 llmd_in_double_scan:1,
135                                 llmd_exit:1;
136 };
137
138 struct lfsck_layout_slave_async_args {
139         struct obd_export                *llsaa_exp;
140         struct lfsck_component           *llsaa_com;
141         struct lfsck_layout_slave_target *llsaa_llst;
142 };
143
144 static struct lfsck_layout_object *
145 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
146                          __u16 gen)
147 {
148         struct lfsck_layout_object *llo;
149         int                         rc;
150
151         OBD_ALLOC_PTR(llo);
152         if (llo == NULL)
153                 return ERR_PTR(-ENOMEM);
154
155         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
156         if (rc != 0) {
157                 OBD_FREE_PTR(llo);
158
159                 return ERR_PTR(rc);
160         }
161
162         lu_object_get(&obj->do_lu);
163         llo->llo_obj = obj;
164         /* The gen can be used to check whether some others have changed the
165          * file layout after LFSCK pre-fetching but before real verification. */
166         llo->llo_gen = gen;
167         atomic_set(&llo->llo_ref, 1);
168
169         return llo;
170 }
171
172 static inline void
173 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
174 {
175         if (atomic_dec_and_test(&llst->llst_ref)) {
176                 LASSERT(list_empty(&llst->llst_list));
177
178                 OBD_FREE_PTR(llst);
179         }
180 }
181
182 static inline int
183 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
184 {
185         struct lfsck_layout_slave_target *llst;
186         struct lfsck_layout_slave_target *tmp;
187         int                               rc   = 0;
188
189         OBD_ALLOC_PTR(llst);
190         if (llst == NULL)
191                 return -ENOMEM;
192
193         INIT_LIST_HEAD(&llst->llst_list);
194         llst->llst_gen = 0;
195         llst->llst_index = index;
196         atomic_set(&llst->llst_ref, 1);
197
198         spin_lock(&llsd->llsd_lock);
199         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
200                 if (tmp->llst_index == index) {
201                         rc = -EALREADY;
202                         break;
203                 }
204         }
205         if (rc == 0)
206                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
207         spin_unlock(&llsd->llsd_lock);
208
209         if (rc != 0)
210                 OBD_FREE_PTR(llst);
211
212         return rc;
213 }
214
215 static inline void
216 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
217                       struct lfsck_layout_slave_target *llst)
218 {
219         bool del = false;
220
221         spin_lock(&llsd->llsd_lock);
222         if (!list_empty(&llst->llst_list)) {
223                 list_del_init(&llst->llst_list);
224                 del = true;
225         }
226         spin_unlock(&llsd->llsd_lock);
227
228         if (del)
229                 lfsck_layout_llst_put(llst);
230 }
231
232 static inline struct lfsck_layout_slave_target *
233 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
234                                __u32 index, bool unlink)
235 {
236         struct lfsck_layout_slave_target *llst;
237
238         spin_lock(&llsd->llsd_lock);
239         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
240                 if (llst->llst_index == index) {
241                         if (unlink)
242                                 list_del_init(&llst->llst_list);
243                         else
244                                 atomic_inc(&llst->llst_ref);
245                         spin_unlock(&llsd->llsd_lock);
246
247                         return llst;
248                 }
249         }
250         spin_unlock(&llsd->llsd_lock);
251
252         return NULL;
253 }
254
255 static inline void lfsck_layout_object_put(const struct lu_env *env,
256                                            struct lfsck_layout_object *llo)
257 {
258         if (atomic_dec_and_test(&llo->llo_ref)) {
259                 lfsck_object_put(env, llo->llo_obj);
260                 OBD_FREE_PTR(llo);
261         }
262 }
263
264 static struct lfsck_layout_req *
265 lfsck_layout_req_init(struct lfsck_layout_object *parent,
266                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
267 {
268         struct lfsck_layout_req *llr;
269
270         OBD_ALLOC_PTR(llr);
271         if (llr == NULL)
272                 return ERR_PTR(-ENOMEM);
273
274         INIT_LIST_HEAD(&llr->llr_list);
275         atomic_inc(&parent->llo_ref);
276         llr->llr_parent = parent;
277         llr->llr_child = child;
278         llr->llr_ost_idx = ost_idx;
279         llr->llr_lov_idx = lov_idx;
280
281         return llr;
282 }
283
284 static inline void lfsck_layout_req_fini(const struct lu_env *env,
285                                          struct lfsck_layout_req *llr)
286 {
287         lu_object_put(env, &llr->llr_child->do_lu);
288         lfsck_layout_object_put(env, llr->llr_parent);
289         OBD_FREE_PTR(llr);
290 }
291
292 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
293 {
294         bool empty = false;
295
296         spin_lock(&llmd->llmd_lock);
297         if (list_empty(&llmd->llmd_req_list))
298                 empty = true;
299         spin_unlock(&llmd->llmd_lock);
300
301         return empty;
302 }
303
304 static int lfsck_layout_get_lovea(const struct lu_env *env,
305                                   struct dt_object *obj, struct lu_buf *buf)
306 {
307         int rc;
308
309 again:
310         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
311         if (rc == -ERANGE) {
312                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
313                                   BYPASS_CAPA);
314                 if (rc <= 0)
315                         return rc;
316
317                 lu_buf_realloc(buf, rc);
318                 if (buf->lb_buf == NULL)
319                         return -ENOMEM;
320
321                 goto again;
322         }
323
324         if (rc == -ENODATA)
325                 rc = 0;
326
327         if (rc <= 0)
328                 return rc;
329
330         if (unlikely(buf->lb_buf == NULL)) {
331                 lu_buf_alloc(buf, rc);
332                 if (buf->lb_buf == NULL)
333                         return -ENOMEM;
334
335                 goto again;
336         }
337
338         return rc;
339 }
340
341 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
342 {
343         __u32 magic;
344         __u32 pattern;
345
346         magic = le32_to_cpu(lmm->lmm_magic);
347         /* If magic crashed, keep it there. Sometime later, during OST-object
348          * orphan handling, if some OST-object(s) back-point to it, it can be
349          * verified and repaired. */
350         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
351                 struct ost_id   oi;
352                 int             rc;
353
354                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
355                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
356                         rc = -EOPNOTSUPP;
357                 else
358                         rc = -EINVAL;
359
360                 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
361                        rc == -EINVAL ? "Unknown" : "Unsupported",
362                        magic, POSTID(&oi));
363
364                 return rc;
365         }
366
367         pattern = le32_to_cpu(lmm->lmm_pattern);
368         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
369         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
370                 struct ost_id oi;
371
372                 lmm_oi_le_to_cpu(&oi, &lmm->lmm_oi);
373                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
374                        pattern, POSTID(&oi));
375
376                 return -EOPNOTSUPP;
377         }
378
379         return 0;
380 }
381
382 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
383 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
384 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
385
386 struct lfsck_rbtree_node {
387         struct rb_node   lrn_node;
388         __u64            lrn_seq;
389         __u32            lrn_first_oid;
390         atomic_t         lrn_known_count;
391         atomic_t         lrn_accessed_count;
392         void            *lrn_known_bitmap;
393         void            *lrn_accessed_bitmap;
394 };
395
396 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
397                                    __u64 seq, __u32 oid)
398 {
399         if (seq < lrn->lrn_seq)
400                 return -1;
401
402         if (seq > lrn->lrn_seq)
403                 return 1;
404
405         if (oid < lrn->lrn_first_oid)
406                 return -1;
407
408         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
409                 return 1;
410
411         return 0;
412 }
413
414 /* The caller should hold llsd->llsd_rb_lock. */
415 static struct lfsck_rbtree_node *
416 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
417                     const struct lu_fid *fid, bool *exact)
418 {
419         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
420         struct rb_node           *prev  = NULL;
421         struct lfsck_rbtree_node *lrn   = NULL;
422         int                       rc    = 0;
423
424         if (exact != NULL)
425                 *exact = true;
426
427         while (node != NULL) {
428                 prev = node;
429                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
430                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
431                 if (rc < 0)
432                         node = node->rb_left;
433                 else if (rc > 0)
434                         node = node->rb_right;
435                 else
436                         return lrn;
437         }
438
439         if (exact == NULL)
440                 return NULL;
441
442         /* If there is no exactly matched one, then to the next valid one. */
443         *exact = false;
444
445         /* The rbtree is empty. */
446         if (rc == 0)
447                 return NULL;
448
449         if (rc < 0)
450                 return lrn;
451
452         node = rb_next(prev);
453
454         /* The end of the rbtree. */
455         if (node == NULL)
456                 return NULL;
457
458         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
459
460         return lrn;
461 }
462
463 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
464                                                   const struct lu_fid *fid)
465 {
466         struct lfsck_rbtree_node *lrn;
467
468         OBD_ALLOC_PTR(lrn);
469         if (lrn == NULL)
470                 return ERR_PTR(-ENOMEM);
471
472         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
473         if (lrn->lrn_known_bitmap == NULL) {
474                 OBD_FREE_PTR(lrn);
475
476                 return ERR_PTR(-ENOMEM);
477         }
478
479         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
480         if (lrn->lrn_accessed_bitmap == NULL) {
481                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
482                 OBD_FREE_PTR(lrn);
483
484                 return ERR_PTR(-ENOMEM);
485         }
486
487         RB_CLEAR_NODE(&lrn->lrn_node);
488         lrn->lrn_seq = fid_seq(fid);
489         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
490         atomic_set(&lrn->lrn_known_count, 0);
491         atomic_set(&lrn->lrn_accessed_count, 0);
492
493         return lrn;
494 }
495
496 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
497 {
498         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
499         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
500         OBD_FREE_PTR(lrn);
501 }
502
503 /* The caller should hold lock. */
504 static struct lfsck_rbtree_node *
505 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
506                     struct lfsck_rbtree_node *lrn)
507 {
508         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
509         struct rb_node            *parent = NULL;
510         struct lfsck_rbtree_node  *tmp;
511         int                        rc;
512
513         while (*pos != NULL) {
514                 parent = *pos;
515                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
516                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
517                 if (rc < 0)
518                         pos = &(*pos)->rb_left;
519                 else if (rc > 0)
520                         pos = &(*pos)->rb_right;
521                 else
522                         return tmp;
523         }
524
525         rb_link_node(&lrn->lrn_node, parent, pos);
526         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
527
528         return lrn;
529 }
530
531 extern const struct dt_index_operations lfsck_orphan_index_ops;
532
533 static int lfsck_rbtree_setup(const struct lu_env *env,
534                               struct lfsck_component *com)
535 {
536         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
537         struct lfsck_instance           *lfsck  = com->lc_lfsck;
538         struct dt_device                *dev    = lfsck->li_bottom;
539         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
540         struct dt_object                *obj;
541
542         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
543         fid->f_oid = lfsck_dev_idx(dev);
544         fid->f_ver = 0;
545         obj = dt_locate(env, dev, fid);
546         if (IS_ERR(obj))
547                 RETURN(PTR_ERR(obj));
548
549         /* Generate an in-RAM object to stand for the layout rbtree.
550          * Scanning the layout rbtree will be via the iteration over
551          * the object. In the future, the rbtree may be written onto
552          * disk with the object.
553          *
554          * Mark the object to be as exist. */
555         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
556         obj->do_index_ops = &lfsck_orphan_index_ops;
557         llsd->llsd_rb_obj = obj;
558         llsd->llsd_rbtree_valid = 1;
559         dev->dd_record_fid_accessed = 1;
560
561         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
562                lfsck_lfsck2name(lfsck));
563
564         return 0;
565 }
566
567 static void lfsck_rbtree_cleanup(const struct lu_env *env,
568                                  struct lfsck_component *com)
569 {
570         struct lfsck_instance           *lfsck = com->lc_lfsck;
571         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
572         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
573         struct rb_node                  *next;
574         struct lfsck_rbtree_node        *lrn;
575
576         lfsck->li_bottom->dd_record_fid_accessed = 0;
577         /* Invalid the rbtree, then no others will use it. */
578         write_lock(&llsd->llsd_rb_lock);
579         llsd->llsd_rbtree_valid = 0;
580         write_unlock(&llsd->llsd_rb_lock);
581
582         while (node != NULL) {
583                 next = rb_next(node);
584                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
585                 rb_erase(node, &llsd->llsd_rb_root);
586                 lfsck_rbtree_free(lrn);
587                 node = next;
588         }
589
590         if (llsd->llsd_rb_obj != NULL) {
591                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
592                 llsd->llsd_rb_obj = NULL;
593         }
594
595         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
596                lfsck_lfsck2name(lfsck));
597 }
598
599 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
600                                        struct lfsck_component *com,
601                                        const struct lu_fid *fid,
602                                        bool accessed)
603 {
604         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
605         struct lfsck_rbtree_node        *lrn;
606         bool                             insert = false;
607         int                              idx;
608         int                              rc     = 0;
609         ENTRY;
610
611         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
612                 RETURN_EXIT;
613
614         if (!fid_is_idif(fid) && !fid_is_norm(fid))
615                 RETURN_EXIT;
616
617         read_lock(&llsd->llsd_rb_lock);
618         if (!llsd->llsd_rbtree_valid)
619                 GOTO(unlock, rc = 0);
620
621         lrn = lfsck_rbtree_search(llsd, fid, NULL);
622         if (lrn == NULL) {
623                 struct lfsck_rbtree_node *tmp;
624
625                 LASSERT(!insert);
626
627                 read_unlock(&llsd->llsd_rb_lock);
628                 tmp = lfsck_rbtree_new(env, fid);
629                 if (IS_ERR(tmp))
630                         GOTO(out, rc = PTR_ERR(tmp));
631
632                 insert = true;
633                 write_lock(&llsd->llsd_rb_lock);
634                 if (!llsd->llsd_rbtree_valid) {
635                         lfsck_rbtree_free(tmp);
636                         GOTO(unlock, rc = 0);
637                 }
638
639                 lrn = lfsck_rbtree_insert(llsd, tmp);
640                 if (lrn != tmp)
641                         lfsck_rbtree_free(tmp);
642         }
643
644         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
645         /* Any accessed object must be a known object. */
646         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
647                 atomic_inc(&lrn->lrn_known_count);
648         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
649                 atomic_inc(&lrn->lrn_accessed_count);
650
651         GOTO(unlock, rc = 0);
652
653 unlock:
654         if (insert)
655                 write_unlock(&llsd->llsd_rb_lock);
656         else
657                 read_unlock(&llsd->llsd_rb_lock);
658 out:
659         if (rc != 0 && accessed) {
660                 struct lfsck_layout *lo = com->lc_file_ram;
661
662                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
663                        "bitmap, and will cause incorrect LFSCK OST-object "
664                        "handling, so disable it to cancel orphan handling "
665                        "for related device. rc = %d\n",
666                        lfsck_lfsck2name(com->lc_lfsck), rc);
667
668                 lo->ll_flags |= LF_INCOMPLETE;
669                 lfsck_rbtree_cleanup(env, com);
670         }
671 }
672
673 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
674                                    const struct lfsck_layout *src)
675 {
676         int i;
677
678         des->ll_magic = le32_to_cpu(src->ll_magic);
679         des->ll_status = le32_to_cpu(src->ll_status);
680         des->ll_flags = le32_to_cpu(src->ll_flags);
681         des->ll_success_count = le32_to_cpu(src->ll_success_count);
682         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
683         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
684         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
685         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
686         des->ll_time_last_checkpoint =
687                                 le64_to_cpu(src->ll_time_last_checkpoint);
688         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
689         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
690         des->ll_pos_first_inconsistent =
691                         le64_to_cpu(src->ll_pos_first_inconsistent);
692         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
693         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
694         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
695         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
696         for (i = 0; i < LLIT_MAX; i++)
697                 des->ll_objs_repaired[i] =
698                                 le64_to_cpu(src->ll_objs_repaired[i]);
699         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
700 }
701
702 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
703                                    const struct lfsck_layout *src)
704 {
705         int i;
706
707         des->ll_magic = cpu_to_le32(src->ll_magic);
708         des->ll_status = cpu_to_le32(src->ll_status);
709         des->ll_flags = cpu_to_le32(src->ll_flags);
710         des->ll_success_count = cpu_to_le32(src->ll_success_count);
711         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
712         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
713         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
714         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
715         des->ll_time_last_checkpoint =
716                                 cpu_to_le64(src->ll_time_last_checkpoint);
717         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
718         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
719         des->ll_pos_first_inconsistent =
720                         cpu_to_le64(src->ll_pos_first_inconsistent);
721         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
722         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
723         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
724         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
725         for (i = 0; i < LLIT_MAX; i++)
726                 des->ll_objs_repaired[i] =
727                                 cpu_to_le64(src->ll_objs_repaired[i]);
728         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
729 }
730
731 /**
732  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
733  * \retval 0: succeed.
734  * \retval -ve: failed cases.
735  */
736 static int lfsck_layout_load(const struct lu_env *env,
737                              struct lfsck_component *com)
738 {
739         struct lfsck_layout             *lo     = com->lc_file_ram;
740         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
741         ssize_t                          size   = com->lc_file_size;
742         loff_t                           pos    = 0;
743         int                              rc;
744
745         rc = dbo->dbo_read(env, com->lc_obj,
746                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
747                            BYPASS_CAPA);
748         if (rc == 0) {
749                 return -ENOENT;
750         } else if (rc < 0) {
751                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
752                        lfsck_lfsck2name(com->lc_lfsck), rc);
753                 return rc;
754         } else if (rc != size) {
755                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
756                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
757                 return 1;
758         }
759
760         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
761         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
762                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
763                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
764                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
765                 return 1;
766         }
767
768         return 0;
769 }
770
771 static int lfsck_layout_store(const struct lu_env *env,
772                               struct lfsck_component *com)
773 {
774         struct dt_object         *obj           = com->lc_obj;
775         struct lfsck_instance    *lfsck         = com->lc_lfsck;
776         struct lfsck_layout      *lo            = com->lc_file_disk;
777         struct thandle           *handle;
778         ssize_t                   size          = com->lc_file_size;
779         loff_t                    pos           = 0;
780         int                       rc;
781         ENTRY;
782
783         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
784         handle = dt_trans_create(env, lfsck->li_bottom);
785         if (IS_ERR(handle))
786                 GOTO(log, rc = PTR_ERR(handle));
787
788         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
789                                      pos, handle);
790         if (rc != 0)
791                 GOTO(out, rc);
792
793         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
794         if (rc != 0)
795                 GOTO(out, rc);
796
797         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
798                              handle);
799
800         GOTO(out, rc);
801
802 out:
803         dt_trans_stop(env, lfsck->li_bottom, handle);
804
805 log:
806         if (rc != 0)
807                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
808                        lfsck_lfsck2name(lfsck), rc);
809         return rc;
810 }
811
812 static int lfsck_layout_init(const struct lu_env *env,
813                              struct lfsck_component *com)
814 {
815         struct lfsck_layout *lo = com->lc_file_ram;
816         int rc;
817
818         memset(lo, 0, com->lc_file_size);
819         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
820         lo->ll_status = LS_INIT;
821         down_write(&com->lc_sem);
822         rc = lfsck_layout_store(env, com);
823         up_write(&com->lc_sem);
824
825         return rc;
826 }
827
828 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
829                              struct dt_object *obj, const struct lu_fid *fid)
830 {
831         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
832         struct lu_seq_range      range  = { 0 };
833         struct lustre_mdt_attrs *lma;
834         int                      rc;
835
836         fld_range_set_any(&range);
837         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
838         if (rc == 0) {
839                 if (fld_range_is_ost(&range))
840                         return 1;
841
842                 return 0;
843         }
844
845         lma = &lfsck_env_info(env)->lti_lma;
846         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
847                           XATTR_NAME_LMA, BYPASS_CAPA);
848         if (rc == sizeof(*lma)) {
849                 lustre_lma_swab(lma);
850
851                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
852         }
853
854         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
855
856         return rc > 0;
857 }
858
859 static struct lfsck_layout_seq *
860 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
861 {
862         struct lfsck_layout_seq *lls;
863
864         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
865                 if (lls->lls_seq == seq)
866                         return lls;
867
868                 if (lls->lls_seq > seq)
869                         return NULL;
870         }
871
872         return NULL;
873 }
874
875 static void
876 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
877                         struct lfsck_layout_seq *lls)
878 {
879         struct lfsck_layout_seq *tmp;
880         struct list_head        *pos = &llsd->llsd_seq_list;
881
882         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
883                 if (lls->lls_seq < tmp->lls_seq) {
884                         pos = &tmp->lls_list;
885                         break;
886                 }
887         }
888         list_add_tail(&lls->lls_list, pos);
889 }
890
891 static int
892 lfsck_layout_lastid_create(const struct lu_env *env,
893                            struct lfsck_instance *lfsck,
894                            struct dt_object *obj)
895 {
896         struct lfsck_thread_info *info   = lfsck_env_info(env);
897         struct lu_attr           *la     = &info->lti_la;
898         struct dt_object_format  *dof    = &info->lti_dof;
899         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
900         struct dt_device         *dt     = lfsck->li_bottom;
901         struct thandle           *th;
902         __u64                     lastid = 0;
903         loff_t                    pos    = 0;
904         int                       rc;
905         ENTRY;
906
907         if (bk->lb_param & LPF_DRYRUN)
908                 return 0;
909
910         memset(la, 0, sizeof(*la));
911         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
912         la->la_valid = LA_MODE | LA_UID | LA_GID;
913         dof->dof_type = dt_mode_to_dft(S_IFREG);
914
915         th = dt_trans_create(env, dt);
916         if (IS_ERR(th))
917                 GOTO(log, rc = PTR_ERR(th));
918
919         rc = dt_declare_create(env, obj, la, NULL, dof, th);
920         if (rc != 0)
921                 GOTO(stop, rc);
922
923         rc = dt_declare_record_write(env, obj,
924                                      lfsck_buf_get(env, &lastid,
925                                                    sizeof(lastid)),
926                                      pos, th);
927         if (rc != 0)
928                 GOTO(stop, rc);
929
930         rc = dt_trans_start_local(env, dt, th);
931         if (rc != 0)
932                 GOTO(stop, rc);
933
934         dt_write_lock(env, obj, 0);
935         if (likely(dt_object_exists(obj) == 0)) {
936                 rc = dt_create(env, obj, la, NULL, dof, th);
937                 if (rc == 0)
938                         rc = dt_record_write(env, obj,
939                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
940                                 &pos, th);
941         }
942         dt_write_unlock(env, obj);
943
944         GOTO(stop, rc);
945
946 stop:
947         dt_trans_stop(env, dt, th);
948
949 log:
950         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
951                LPX64": rc = %d\n",
952                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
953
954         return rc;
955 }
956
957 static int
958 lfsck_layout_lastid_reload(const struct lu_env *env,
959                            struct lfsck_component *com,
960                            struct lfsck_layout_seq *lls)
961 {
962         __u64   lastid;
963         loff_t  pos     = 0;
964         int     rc;
965
966         dt_read_lock(env, lls->lls_lastid_obj, 0);
967         rc = dt_record_read(env, lls->lls_lastid_obj,
968                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
969         dt_read_unlock(env, lls->lls_lastid_obj);
970         if (unlikely(rc != 0))
971                 return rc;
972
973         lastid = le64_to_cpu(lastid);
974         if (lastid < lls->lls_lastid_known) {
975                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
976                 struct lfsck_layout     *lo     = com->lc_file_ram;
977
978                 lls->lls_lastid = lls->lls_lastid_known;
979                 lls->lls_dirty = 1;
980                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
981                         LASSERT(lfsck->li_out_notify != NULL);
982
983                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
984                                              LE_LASTID_REBUILDING);
985                         lo->ll_flags |= LF_CRASHED_LASTID;
986
987                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds crashed "
988                                "LAST_ID file (1) for the sequence "LPX64
989                                ", old value "LPU64", known value "LPU64"\n",
990                                lfsck_lfsck2name(lfsck), lls->lls_seq,
991                                lastid, lls->lls_lastid);
992                 }
993         } else if (lastid >= lls->lls_lastid) {
994                 lls->lls_lastid = lastid;
995                 lls->lls_dirty = 0;
996         }
997
998         return 0;
999 }
1000
1001 static int
1002 lfsck_layout_lastid_store(const struct lu_env *env,
1003                           struct lfsck_component *com)
1004 {
1005         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1006         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1007         struct dt_device                *dt     = lfsck->li_bottom;
1008         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1009         struct lfsck_layout_seq         *lls;
1010         struct thandle                  *th;
1011         __u64                            lastid;
1012         int                              rc     = 0;
1013         int                              rc1    = 0;
1014
1015         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1016                 loff_t pos = 0;
1017
1018                 if (!lls->lls_dirty)
1019                         continue;
1020
1021                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1022                        "<seq> "LPX64" as <oid> "LPU64"\n",
1023                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1024
1025                 if (bk->lb_param & LPF_DRYRUN) {
1026                         lls->lls_dirty = 0;
1027                         continue;
1028                 }
1029
1030                 th = dt_trans_create(env, dt);
1031                 if (IS_ERR(th)) {
1032                         rc1 = PTR_ERR(th);
1033                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1034                                "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1035                                lfsck_lfsck2name(com->lc_lfsck),
1036                                lls->lls_seq, rc1);
1037                         continue;
1038                 }
1039
1040                 lastid = cpu_to_le64(lls->lls_lastid);
1041                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1042                                              lfsck_buf_get(env, &lastid,
1043                                                            sizeof(lastid)),
1044                                              pos, th);
1045                 if (rc != 0)
1046                         goto stop;
1047
1048                 rc = dt_trans_start_local(env, dt, th);
1049                 if (rc != 0)
1050                         goto stop;
1051
1052                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1053                 rc = dt_record_write(env, lls->lls_lastid_obj,
1054                                      lfsck_buf_get(env, &lastid,
1055                                      sizeof(lastid)), &pos, th);
1056                 dt_write_unlock(env, lls->lls_lastid_obj);
1057                 if (rc == 0)
1058                         lls->lls_dirty = 0;
1059
1060 stop:
1061                 dt_trans_stop(env, dt, th);
1062                 if (rc != 0) {
1063                         rc1 = rc;
1064                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1065                                "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1066                                lfsck_lfsck2name(com->lc_lfsck),
1067                                lls->lls_seq, rc1);
1068                 }
1069         }
1070
1071         return rc1;
1072 }
1073
1074 static int
1075 lfsck_layout_lastid_load(const struct lu_env *env,
1076                          struct lfsck_component *com,
1077                          struct lfsck_layout_seq *lls)
1078 {
1079         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1080         struct lfsck_layout     *lo     = com->lc_file_ram;
1081         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1082         struct dt_object        *obj;
1083         loff_t                   pos    = 0;
1084         int                      rc;
1085         ENTRY;
1086
1087         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1088         obj = dt_locate(env, lfsck->li_bottom, fid);
1089         if (IS_ERR(obj))
1090                 RETURN(PTR_ERR(obj));
1091
1092         /* LAST_ID crashed, to be rebuilt */
1093         if (dt_object_exists(obj) == 0) {
1094                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1095                         LASSERT(lfsck->li_out_notify != NULL);
1096
1097                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1098                                              LE_LASTID_REBUILDING);
1099                         lo->ll_flags |= LF_CRASHED_LASTID;
1100
1101                         CDEBUG(D_LFSCK, "%s: layout LFSCK cannot find the "
1102                                "LAST_ID file for sequence "LPX64"\n",
1103                                lfsck_lfsck2name(lfsck), lls->lls_seq);
1104
1105                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1106                             cfs_fail_val > 0) {
1107                                 struct l_wait_info lwi = LWI_TIMEOUT(
1108                                                 cfs_time_seconds(cfs_fail_val),
1109                                                 NULL, NULL);
1110
1111                                 up_write(&com->lc_sem);
1112                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1113                                              !thread_is_running(&lfsck->li_thread),
1114                                              &lwi);
1115                                 down_write(&com->lc_sem);
1116                         }
1117                 }
1118
1119                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1120         } else {
1121                 dt_read_lock(env, obj, 0);
1122                 rc = dt_read(env, obj,
1123                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1124                         &pos);
1125                 dt_read_unlock(env, obj);
1126                 if (rc != 0 && rc != sizeof(__u64))
1127                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1128
1129                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1130                         LASSERT(lfsck->li_out_notify != NULL);
1131
1132                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1133                                              LE_LASTID_REBUILDING);
1134                         lo->ll_flags |= LF_CRASHED_LASTID;
1135
1136                         CDEBUG(D_LFSCK, "%s: layout LFSCK finds invalid "
1137                                "LAST_ID file for the sequence "LPX64
1138                                ": rc = %d\n",
1139                                lfsck_lfsck2name(lfsck), lls->lls_seq, rc);
1140                 }
1141
1142                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1143                 rc = 0;
1144         }
1145
1146         GOTO(out, rc);
1147
1148 out:
1149         if (rc != 0)
1150                 lfsck_object_put(env, obj);
1151         else
1152                 lls->lls_lastid_obj = obj;
1153
1154         return rc;
1155 }
1156
1157 static void lfsck_layout_record_failure(const struct lu_env *env,
1158                                                  struct lfsck_instance *lfsck,
1159                                                  struct lfsck_layout *lo)
1160 {
1161         lo->ll_objs_failed_phase1++;
1162         if (unlikely(lo->ll_pos_first_inconsistent == 0)) {
1163                 lo->ll_pos_first_inconsistent =
1164                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1165                                                         lfsck->li_di_oit);
1166
1167                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1168                        "inconsistency at the pos ["LPU64"]\n",
1169                        lfsck_lfsck2name(lfsck),
1170                        lo->ll_pos_first_inconsistent);
1171         }
1172 }
1173
1174 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1175                                                struct ptlrpc_request *req,
1176                                                void *args, int rc)
1177 {
1178         struct lfsck_async_interpret_args *laia = args;
1179         struct lfsck_component            *com  = laia->laia_com;
1180         struct lfsck_layout_master_data   *llmd = com->lc_data;
1181         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1182         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1183         struct lfsck_request              *lr   = laia->laia_lr;
1184
1185         switch (lr->lr_event) {
1186         case LE_START:
1187                 if (rc != 0) {
1188                         struct lfsck_layout *lo = com->lc_file_ram;
1189
1190                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout "
1191                                "start: rc = %d\n",
1192                                lfsck_lfsck2name(com->lc_lfsck),
1193                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1194                                ltd->ltd_index, rc);
1195                         lo->ll_flags |= LF_INCOMPLETE;
1196                         break;
1197                 }
1198
1199                 spin_lock(&ltds->ltd_lock);
1200                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1201                         spin_unlock(&ltds->ltd_lock);
1202                         break;
1203                 }
1204
1205                 if (lr->lr_flags & LEF_TO_OST) {
1206                         if (list_empty(&ltd->ltd_layout_list))
1207                                 list_add_tail(&ltd->ltd_layout_list,
1208                                               &llmd->llmd_ost_list);
1209                         if (list_empty(&ltd->ltd_layout_phase_list))
1210                                 list_add_tail(&ltd->ltd_layout_phase_list,
1211                                               &llmd->llmd_ost_phase1_list);
1212                 } else {
1213                         if (list_empty(&ltd->ltd_layout_list))
1214                                 list_add_tail(&ltd->ltd_layout_list,
1215                                               &llmd->llmd_mdt_list);
1216                         if (list_empty(&ltd->ltd_layout_phase_list))
1217                                 list_add_tail(&ltd->ltd_layout_phase_list,
1218                                               &llmd->llmd_mdt_phase1_list);
1219                 }
1220                 spin_unlock(&ltds->ltd_lock);
1221                 break;
1222         case LE_STOP:
1223         case LE_PHASE1_DONE:
1224         case LE_PHASE2_DONE:
1225         case LE_PEER_EXIT:
1226                 if (rc != 0 && rc != -EALREADY)
1227                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout: "
1228                                "event = %d, rc = %d\n",
1229                                lfsck_lfsck2name(com->lc_lfsck),
1230                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1231                                ltd->ltd_index, lr->lr_event, rc);
1232                 break;
1233         case LE_QUERY: {
1234                 struct lfsck_reply *reply;
1235
1236                 if (rc != 0) {
1237                         spin_lock(&ltds->ltd_lock);
1238                         list_del_init(&ltd->ltd_layout_phase_list);
1239                         list_del_init(&ltd->ltd_layout_list);
1240                         spin_unlock(&ltds->ltd_lock);
1241                         break;
1242                 }
1243
1244                 reply = req_capsule_server_get(&req->rq_pill,
1245                                                &RMF_LFSCK_REPLY);
1246                 if (reply == NULL) {
1247                         rc = -EPROTO;
1248                         CDEBUG(D_LFSCK, "%s:  invalid query reply: rc = %d\n",
1249                                lfsck_lfsck2name(com->lc_lfsck), rc);
1250                         spin_lock(&ltds->ltd_lock);
1251                         list_del_init(&ltd->ltd_layout_phase_list);
1252                         list_del_init(&ltd->ltd_layout_list);
1253                         spin_unlock(&ltds->ltd_lock);
1254                         break;
1255                 }
1256
1257                 switch (reply->lr_status) {
1258                 case LS_SCANNING_PHASE1:
1259                         break;
1260                 case LS_SCANNING_PHASE2:
1261                         spin_lock(&ltds->ltd_lock);
1262                         list_del_init(&ltd->ltd_layout_phase_list);
1263                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1264                                 spin_unlock(&ltds->ltd_lock);
1265                                 break;
1266                         }
1267
1268                         if (lr->lr_flags & LEF_TO_OST)
1269                                 list_add_tail(&ltd->ltd_layout_phase_list,
1270                                               &llmd->llmd_ost_phase2_list);
1271                         else
1272                                 list_add_tail(&ltd->ltd_layout_phase_list,
1273                                               &llmd->llmd_mdt_phase2_list);
1274                         spin_unlock(&ltds->ltd_lock);
1275                         break;
1276                 default:
1277                         spin_lock(&ltds->ltd_lock);
1278                         list_del_init(&ltd->ltd_layout_phase_list);
1279                         list_del_init(&ltd->ltd_layout_list);
1280                         spin_unlock(&ltds->ltd_lock);
1281                         break;
1282                 }
1283                 break;
1284         }
1285         default:
1286                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1287                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1288                 break;
1289         }
1290
1291         if (!laia->laia_shared) {
1292                 lfsck_tgt_put(ltd);
1293                 lfsck_component_put(env, com);
1294         }
1295
1296         return 0;
1297 }
1298
1299 static int lfsck_layout_master_query_others(const struct lu_env *env,
1300                                             struct lfsck_component *com)
1301 {
1302         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1303         struct lfsck_request              *lr    = &info->lti_lr;
1304         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1305         struct lfsck_instance             *lfsck = com->lc_lfsck;
1306         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1307         struct ptlrpc_request_set         *set;
1308         struct lfsck_tgt_descs            *ltds;
1309         struct lfsck_tgt_desc             *ltd;
1310         struct list_head                  *head;
1311         int                                rc    = 0;
1312         int                                rc1   = 0;
1313         ENTRY;
1314
1315         set = ptlrpc_prep_set();
1316         if (set == NULL)
1317                 RETURN(-ENOMEM);
1318
1319         llmd->llmd_touch_gen++;
1320         memset(lr, 0, sizeof(*lr));
1321         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1322         lr->lr_event = LE_QUERY;
1323         lr->lr_active = LFSCK_TYPE_LAYOUT;
1324         laia->laia_com = com;
1325         laia->laia_lr = lr;
1326         laia->laia_shared = 0;
1327
1328         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1329                 ltds = &lfsck->li_mdt_descs;
1330                 lr->lr_flags = 0;
1331                 head = &llmd->llmd_mdt_phase1_list;
1332         } else {
1333
1334 again:
1335                 ltds = &lfsck->li_ost_descs;
1336                 lr->lr_flags = LEF_TO_OST;
1337                 head = &llmd->llmd_ost_phase1_list;
1338         }
1339
1340         laia->laia_ltds = ltds;
1341         spin_lock(&ltds->ltd_lock);
1342         while (!list_empty(head)) {
1343                 ltd = list_entry(head->next,
1344                                  struct lfsck_tgt_desc,
1345                                  ltd_layout_phase_list);
1346                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1347                         break;
1348
1349                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1350                 list_move_tail(&ltd->ltd_layout_phase_list, head);
1351                 atomic_inc(&ltd->ltd_ref);
1352                 laia->laia_ltd = ltd;
1353                 spin_unlock(&ltds->ltd_lock);
1354                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1355                                          lfsck_layout_master_async_interpret,
1356                                          laia, LFSCK_QUERY);
1357                 if (rc != 0) {
1358                         CDEBUG(D_LFSCK, "%s: layout LFSCK fail to query %s %x: "
1359                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1360                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1361                                ltd->ltd_index, rc);
1362                         lfsck_tgt_put(ltd);
1363                         rc1 = rc;
1364                 }
1365                 spin_lock(&ltds->ltd_lock);
1366         }
1367         spin_unlock(&ltds->ltd_lock);
1368
1369         rc = ptlrpc_set_wait(set);
1370         if (rc < 0) {
1371                 ptlrpc_set_destroy(set);
1372                 RETURN(rc);
1373         }
1374
1375         if (!(lr->lr_flags & LEF_TO_OST) &&
1376             list_empty(&llmd->llmd_mdt_phase1_list))
1377                 goto again;
1378
1379         ptlrpc_set_destroy(set);
1380
1381         RETURN(rc1 != 0 ? rc1 : rc);
1382 }
1383
1384 static inline bool
1385 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1386 {
1387         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1388                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1389                 list_empty(&llmd->llmd_ost_phase1_list));
1390 }
1391
1392 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1393                                              struct lfsck_component *com,
1394                                              struct lfsck_request *lr)
1395 {
1396         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1397         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1398         struct lfsck_instance             *lfsck = com->lc_lfsck;
1399         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1400         struct lfsck_layout               *lo    = com->lc_file_ram;
1401         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1402         struct ptlrpc_request_set         *set;
1403         struct lfsck_tgt_descs            *ltds;
1404         struct lfsck_tgt_desc             *ltd;
1405         struct lfsck_tgt_desc             *next;
1406         struct list_head                  *head;
1407         __u32                              idx;
1408         int                                rc    = 0;
1409         ENTRY;
1410
1411         set = ptlrpc_prep_set();
1412         if (set == NULL)
1413                 RETURN(-ENOMEM);
1414
1415         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1416         lr->lr_active = LFSCK_TYPE_LAYOUT;
1417         laia->laia_com = com;
1418         laia->laia_lr = lr;
1419         laia->laia_shared = 0;
1420         switch (lr->lr_event) {
1421         case LE_START:
1422                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1423                 ltds = &lfsck->li_ost_descs;
1424                 laia->laia_ltds = ltds;
1425                 down_read(&ltds->ltd_rw_sem);
1426                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1427                         ltd = lfsck_tgt_get(ltds, idx);
1428                         LASSERT(ltd != NULL);
1429
1430                         laia->laia_ltd = ltd;
1431                         ltd->ltd_layout_done = 0;
1432                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1433                                         lfsck_layout_master_async_interpret,
1434                                         laia, LFSCK_NOTIFY);
1435                         if (rc != 0) {
1436                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1437                                        "notify %s %x for start: rc = %d\n",
1438                                        lfsck_lfsck2name(lfsck),
1439                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1440                                        "MDT", idx, rc);
1441                                 lfsck_tgt_put(ltd);
1442                                 lo->ll_flags |= LF_INCOMPLETE;
1443                         }
1444                 }
1445                 up_read(&ltds->ltd_rw_sem);
1446
1447                 /* Sync up */
1448                 rc = ptlrpc_set_wait(set);
1449                 if (rc < 0) {
1450                         ptlrpc_set_destroy(set);
1451                         RETURN(rc);
1452                 }
1453
1454                 if (!(bk->lb_param & LPF_ALL_TGT))
1455                         break;
1456
1457                 /* link other MDT targets locallly. */
1458                 ltds = &lfsck->li_mdt_descs;
1459                 spin_lock(&ltds->ltd_lock);
1460                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1461                         ltd = LTD_TGT(ltds, idx);
1462                         LASSERT(ltd != NULL);
1463
1464                         if (!list_empty(&ltd->ltd_layout_list))
1465                                 continue;
1466
1467                         list_add_tail(&ltd->ltd_layout_list,
1468                                       &llmd->llmd_mdt_list);
1469                         list_add_tail(&ltd->ltd_layout_phase_list,
1470                                       &llmd->llmd_mdt_phase1_list);
1471                 }
1472                 spin_unlock(&ltds->ltd_lock);
1473                 break;
1474         case LE_STOP:
1475         case LE_PHASE2_DONE:
1476         case LE_PEER_EXIT: {
1477                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1478                 if (bk->lb_param & LPF_ALL_TGT) {
1479                         head = &llmd->llmd_mdt_list;
1480                         ltds = &lfsck->li_mdt_descs;
1481                         if (lr->lr_event == LE_STOP) {
1482                                 /* unlink other MDT targets locallly. */
1483                                 spin_lock(&ltds->ltd_lock);
1484                                 list_for_each_entry_safe(ltd, next, head,
1485                                                          ltd_layout_list) {
1486                                         list_del_init(&ltd->ltd_layout_phase_list);
1487                                         list_del_init(&ltd->ltd_layout_list);
1488                                 }
1489                                 spin_unlock(&ltds->ltd_lock);
1490
1491                                 lr->lr_flags |= LEF_TO_OST;
1492                                 head = &llmd->llmd_ost_list;
1493                                 ltds = &lfsck->li_ost_descs;
1494                         } else {
1495                                 lr->lr_flags &= ~LEF_TO_OST;
1496                         }
1497                 } else {
1498                         lr->lr_flags |= LEF_TO_OST;
1499                         head = &llmd->llmd_ost_list;
1500                         ltds = &lfsck->li_ost_descs;
1501                 }
1502
1503 again:
1504                 laia->laia_ltds = ltds;
1505                 spin_lock(&ltds->ltd_lock);
1506                 while (!list_empty(head)) {
1507                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1508                                          ltd_layout_list);
1509                         if (!list_empty(&ltd->ltd_layout_phase_list))
1510                                 list_del_init(&ltd->ltd_layout_phase_list);
1511                         list_del_init(&ltd->ltd_layout_list);
1512                         atomic_inc(&ltd->ltd_ref);
1513                         laia->laia_ltd = ltd;
1514                         spin_unlock(&ltds->ltd_lock);
1515                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1516                                         lfsck_layout_master_async_interpret,
1517                                         laia, LFSCK_NOTIFY);
1518                         if (rc != 0) {
1519                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1520                                        "notify %s %x for stop/phase2_done/"
1521                                        "peer_exit: rc = %d\n",
1522                                        lfsck_lfsck2name(lfsck),
1523                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1524                                        "MDT", ltd->ltd_index, rc);
1525                                 lfsck_tgt_put(ltd);
1526                         }
1527                         spin_lock(&ltds->ltd_lock);
1528                 }
1529                 spin_unlock(&ltds->ltd_lock);
1530
1531                 rc = ptlrpc_set_wait(set);
1532                 if (rc < 0) {
1533                         ptlrpc_set_destroy(set);
1534                         RETURN(rc);
1535                 }
1536
1537                 if (!(lr->lr_flags & LEF_TO_OST)) {
1538                         lr->lr_flags |= LEF_TO_OST;
1539                         head = &llmd->llmd_ost_list;
1540                         ltds = &lfsck->li_ost_descs;
1541                         goto again;
1542                 }
1543                 break;
1544         }
1545         case LE_PHASE1_DONE:
1546                 llmd->llmd_touch_gen++;
1547                 ltds = &lfsck->li_mdt_descs;
1548                 laia->laia_ltds = ltds;
1549                 spin_lock(&ltds->ltd_lock);
1550                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1551                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1552                                          struct lfsck_tgt_desc,
1553                                          ltd_layout_phase_list);
1554                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1555                                 break;
1556
1557                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1558                         list_move_tail(&ltd->ltd_layout_phase_list,
1559                                        &llmd->llmd_mdt_phase1_list);
1560                         atomic_inc(&ltd->ltd_ref);
1561                         laia->laia_ltd = ltd;
1562                         spin_unlock(&ltds->ltd_lock);
1563                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1564                                         lfsck_layout_master_async_interpret,
1565                                         laia, LFSCK_NOTIFY);
1566                         if (rc != 0) {
1567                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1568                                        "notify MDT %x for phase1_done: "
1569                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1570                                        ltd->ltd_index, rc);
1571                                 lfsck_tgt_put(ltd);
1572                         }
1573                         spin_lock(&ltds->ltd_lock);
1574                 }
1575                 spin_unlock(&ltds->ltd_lock);
1576                 break;
1577         default:
1578                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1579                        lfsck_lfsck2name(lfsck), lr->lr_event);
1580                 rc = -EINVAL;
1581                 break;
1582         }
1583
1584         rc = ptlrpc_set_wait(set);
1585         ptlrpc_set_destroy(set);
1586
1587         RETURN(rc);
1588 }
1589
1590 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1591                                            struct lfsck_component *com,
1592                                            int rc)
1593 {
1594         struct lfsck_instance   *lfsck = com->lc_lfsck;
1595         struct lfsck_layout     *lo    = com->lc_file_ram;
1596         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1597
1598         down_write(&com->lc_sem);
1599         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1600                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1601         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1602         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1603
1604         if (rc > 0) {
1605                 com->lc_journal = 0;
1606                 if (lo->ll_flags & LF_INCOMPLETE)
1607                         lo->ll_status = LS_PARTIAL;
1608                 else
1609                         lo->ll_status = LS_COMPLETED;
1610                 if (!(bk->lb_param & LPF_DRYRUN))
1611                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1612                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1613                 lo->ll_success_count++;
1614         } else if (rc == 0) {
1615                 lo->ll_status = lfsck->li_status;
1616                 if (lo->ll_status == 0)
1617                         lo->ll_status = LS_STOPPED;
1618         } else {
1619                 lo->ll_status = LS_FAILED;
1620         }
1621
1622         rc = lfsck_layout_store(env, com);
1623         up_write(&com->lc_sem);
1624
1625         return rc;
1626 }
1627
1628 static int lfsck_layout_trans_stop(const struct lu_env *env,
1629                                    struct dt_device *dev,
1630                                    struct thandle *handle, int result)
1631 {
1632         int rc;
1633
1634         handle->th_result = result;
1635         rc = dt_trans_stop(env, dev, handle);
1636         if (rc > 0)
1637                 rc = 0;
1638         else if (rc == 0)
1639                 rc = 1;
1640
1641         return rc;
1642 }
1643
1644 /**
1645  * Get the system default stripe size.
1646  *
1647  * \param[in] env       pointer to the thread context
1648  * \param[in] lfsck     pointer to the lfsck instance
1649  * \param[out] size     pointer to the default stripe size
1650  *
1651  * \retval              0 for success
1652  * \retval              negative error number on failure
1653  */
1654 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1655                                            struct lfsck_instance *lfsck,
1656                                            __u32 *size)
1657 {
1658         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1659         struct dt_object        *root;
1660         int                      rc;
1661
1662         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1663         if (IS_ERR(root))
1664                 return PTR_ERR(root);
1665
1666         /* Get the default stripe size via xattr_get on the backend root. */
1667         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1668                           XATTR_NAME_LOV, BYPASS_CAPA);
1669         if (rc > 0) {
1670                 /* The lum->lmm_stripe_size is LE mode. The *size also
1671                  * should be LE mode. So it is unnecessary to convert. */
1672                 *size = lum->lmm_stripe_size;
1673                 rc = 0;
1674         } else if (unlikely(rc == 0)) {
1675                 rc = -EINVAL;
1676         }
1677
1678         lfsck_object_put(env, root);
1679
1680         return rc;
1681 }
1682
1683 /**
1684  * \retval       +1: repaired
1685  * \retval        0: did nothing
1686  * \retval      -ve: on error
1687  */
1688 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1689                                      struct thandle *handle,
1690                                      struct dt_object *parent,
1691                                      struct lu_fid *cfid,
1692                                      struct lu_buf *buf,
1693                                      struct lov_ost_data_v1 *slot,
1694                                      int fl, __u32 ost_idx)
1695 {
1696         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1697         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1698         struct lu_buf            ea_buf;
1699         int                      rc;
1700         __u32                    magic;
1701         __u16                    count;
1702
1703         magic = le32_to_cpu(lmm->lmm_magic);
1704         count = le16_to_cpu(lmm->lmm_stripe_count);
1705
1706         fid_to_ostid(cfid, oi);
1707         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1708         slot->l_ost_gen = cpu_to_le32(0);
1709         slot->l_ost_idx = cpu_to_le32(ost_idx);
1710
1711         if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1712                 struct lov_ost_data_v1 *objs;
1713                 int                     i;
1714
1715                 if (magic == LOV_MAGIC_V1)
1716                         objs = &lmm->lmm_objects[0];
1717                 else
1718                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1719                 for (i = 0; i < count; i++, objs++) {
1720                         if (objs != slot && lovea_slot_is_dummy(objs))
1721                                 break;
1722                 }
1723
1724                 /* If the @slot is the last dummy slot to be refilled,
1725                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1726                 if (i == count)
1727                         lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1728         }
1729
1730         lfsck_buf_init(&ea_buf, lmm, lov_mds_md_size(count, magic));
1731         rc = dt_xattr_set(env, parent, &ea_buf, XATTR_NAME_LOV, fl, handle,
1732                           BYPASS_CAPA);
1733         if (rc == 0)
1734                 rc = 1;
1735
1736         return rc;
1737 }
1738
1739 /**
1740  * \retval       +1: repaired
1741  * \retval        0: did nothing
1742  * \retval      -ve: on error
1743  */
1744 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1745                                      struct lfsck_instance *lfsck,
1746                                      struct thandle *handle,
1747                                      struct dt_object *parent,
1748                                      struct lu_fid *cfid,
1749                                      struct lu_buf *buf, int fl,
1750                                      __u32 ost_idx, __u32 ea_off, bool reset)
1751 {
1752         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1753         struct lov_ost_data_v1  *objs;
1754         int                      rc;
1755         __u16                    count;
1756         bool                     hole   = false;
1757         ENTRY;
1758
1759         if (fl == LU_XATTR_CREATE || reset) {
1760                 __u32 pattern = LOV_PATTERN_RAID0;
1761
1762                 count = ea_off + 1;
1763                 LASSERT(buf->lb_len >= lov_mds_md_size(count, LOV_MAGIC_V1));
1764
1765                 if (ea_off != 0 || reset) {
1766                         pattern |= LOV_PATTERN_F_HOLE;
1767                         hole = true;
1768                 }
1769
1770                 memset(lmm, 0, buf->lb_len);
1771                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1772                 lmm->lmm_pattern = cpu_to_le32(pattern);
1773                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1774                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1775
1776                 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1777                                                      &lmm->lmm_stripe_size);
1778                 if (rc != 0)
1779                         RETURN(rc);
1780
1781                 objs = &lmm->lmm_objects[ea_off];
1782         } else {
1783                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1784                 int     gap;
1785
1786                 count = le16_to_cpu(lmm->lmm_stripe_count);
1787                 if (magic == LOV_MAGIC_V1)
1788                         objs = &lmm->lmm_objects[count];
1789                 else
1790                         objs = &((struct lov_mds_md_v3 *)lmm)->
1791                                                         lmm_objects[count];
1792
1793                 gap = ea_off - count;
1794                 if (gap >= 0)
1795                         count = ea_off + 1;
1796                 LASSERT(buf->lb_len >= lov_mds_md_size(count, magic));
1797
1798                 if (gap > 0) {
1799                         memset(objs, 0, gap * sizeof(*objs));
1800                         lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1801                         hole = true;
1802                 }
1803
1804                 lmm->lmm_layout_gen =
1805                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1806                 objs += gap;
1807         }
1808
1809         lmm->lmm_stripe_count = cpu_to_le16(count);
1810         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1811                                        fl, ost_idx);
1812
1813         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1814                DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1815                "reset %s, %s LOV EA hole: rc = %d\n",
1816                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1817                ost_idx, ea_off, fl, reset ? "yes" : "no",
1818                hole ? "with" : "without", rc);
1819
1820         RETURN(rc);
1821 }
1822
1823 /**
1824  * \retval       +1: repaired
1825  * \retval        0: did nothing
1826  * \retval      -ve: on error
1827  */
1828 static int lfsck_layout_update_pfid(const struct lu_env *env,
1829                                     struct lfsck_component *com,
1830                                     struct dt_object *parent,
1831                                     struct lu_fid *cfid,
1832                                     struct dt_device *cdev, __u32 ea_off)
1833 {
1834         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1835         struct dt_object        *child;
1836         struct thandle          *handle;
1837         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1838         struct lu_buf           *buf;
1839         int                      rc     = 0;
1840         ENTRY;
1841
1842         child = lfsck_object_find_by_dev(env, cdev, cfid);
1843         if (IS_ERR(child))
1844                 RETURN(PTR_ERR(child));
1845
1846         handle = dt_trans_create(env, cdev);
1847         if (IS_ERR(handle))
1848                 GOTO(out, rc = PTR_ERR(handle));
1849
1850         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1851         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1852         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1853          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1854          * parent MDT-object's layout EA. */
1855         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1856         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1857
1858         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1859         if (rc != 0)
1860                 GOTO(stop, rc);
1861
1862         rc = dt_trans_start(env, cdev, handle);
1863         if (rc != 0)
1864                 GOTO(stop, rc);
1865
1866         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1867                           BYPASS_CAPA);
1868
1869         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1870
1871 stop:
1872         dt_trans_stop(env, cdev, handle);
1873
1874 out:
1875         lu_object_put(env, &child->do_lu);
1876
1877         return rc;
1878 }
1879
1880 /**
1881  * This function will create the MDT-object with the given (partial) LOV EA.
1882  *
1883  * Under some data corruption cases, the MDT-object of the file may be lost,
1884  * but its OST-objects, or some of them are there. The layout LFSCK needs to
1885  * re-create the MDT-object with the orphan OST-object(s) information.
1886  *
1887  * On the other hand, the LFSCK may has created some OST-object for repairing
1888  * dangling LOV EA reference, but as the LFSCK processing, it may find that
1889  * the old OST-object is there and should replace the former new created OST
1890  * object. Unfortunately, some others have modified such newly created object.
1891  * To keep the data (both new and old), the LFSCK will create MDT-object with
1892  * new FID to reference the original OST-object.
1893  *
1894  * \param[in] env       pointer to the thread context
1895  * \param[in] com       pointer to the lfsck component
1896  * \param[in] ltd       pointer to target device descriptor
1897  * \param[in] rec       pointer to the record for the orphan OST-object
1898  * \param[in] cfid      pointer to FID for the orphan OST-object
1899  * \param[in] infix     additional information, such as the FID for original
1900  *                      MDT-object and the stripe offset in the LOV EA
1901  * \param[in] type      the type for describing why the orphan MDT-object is
1902  *                      created. The rules are as following:
1903  *
1904  *  type "C":           Multiple OST-objects claim the same MDT-object and the
1905  *                      same slot in the layout EA. Then the LFSCK will create
1906  *                      new MDT-object(s) to hold the conflict OST-object(s).
1907  *
1908  *  type "N":           The orphan OST-object does not know which one was the
1909  *                      real parent MDT-object, so the LFSCK uses new FID for
1910  *                      its parent MDT-object.
1911  *
1912  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
1913  *                      but does not know the position (the file name) in the
1914  *                      namespace.
1915  *
1916  * The orphan name will be like:
1917  * ${FID}-${infix}-${type}-${conflict_version}
1918  *
1919  * \param[in] ea_off    the stripe offset in the LOV EA
1920  *
1921  * \retval              positive on repaired something
1922  * \retval              0 if needs to repair nothing
1923  * \retval              negative error number on failure
1924  */
1925 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1926                                         struct lfsck_component *com,
1927                                         struct lfsck_tgt_desc *ltd,
1928                                         struct lu_orphan_rec *rec,
1929                                         struct lu_fid *cfid,
1930                                         const char *infix,
1931                                         const char *type,
1932                                         __u32 ea_off)
1933 {
1934         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1935         struct dt_insert_rec            *dtrec  = &info->lti_dt_rec;
1936         char                            *name   = info->lti_key;
1937         struct lu_attr                  *la     = &info->lti_la;
1938         struct dt_object_format         *dof    = &info->lti_dof;
1939         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1940         struct lu_fid                   *pfid   = &rec->lor_fid;
1941         struct lu_fid                   *tfid   = &info->lti_fid3;
1942         struct dt_device                *next   = lfsck->li_next;
1943         struct dt_object                *pobj   = NULL;
1944         struct dt_object                *cobj   = NULL;
1945         struct thandle                  *th     = NULL;
1946         struct lu_buf                    pbuf   = { 0 };
1947         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1948         struct lu_buf                    lov_buf;
1949         struct lustre_handle             lh     = { 0 };
1950         struct linkea_data               ldata  = { 0 };
1951         struct lu_buf                    linkea_buf;
1952         const struct lu_name            *pname;
1953         int                              size   = 0;
1954         int                              idx    = 0;
1955         int                              rc     = 0;
1956         ENTRY;
1957
1958         /* Create .lustre/lost+found/MDTxxxx when needed. */
1959         if (unlikely(lfsck->li_lpf_obj == NULL)) {
1960                 rc = lfsck_create_lpf(env, lfsck);
1961                 if (rc != 0)
1962                         GOTO(log, rc);
1963         }
1964
1965         if (fid_is_zero(pfid)) {
1966                 struct filter_fid *ff = &info->lti_new_pfid;
1967
1968                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
1969                 if (rc != 0)
1970                         RETURN(rc);
1971
1972                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
1973                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
1974                 /* Currently, the filter_fid::ff_parent::f_ver is not the
1975                  * real parent MDT-object's FID::f_ver, instead it is the
1976                  * OST-object index in its parent MDT-object's layout EA. */
1977                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1978                 lfsck_buf_init(&pbuf, ff, sizeof(struct filter_fid));
1979                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
1980                 if (IS_ERR(cobj))
1981                         GOTO(log, rc = PTR_ERR(cobj));
1982         }
1983
1984         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
1985         if (IS_ERR(pobj))
1986                 GOTO(put, rc = PTR_ERR(pobj));
1987
1988         LASSERT(infix != NULL);
1989         LASSERT(type != NULL);
1990
1991         do {
1992                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
1993                          type, idx++);
1994                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
1995                                (const struct dt_key *)name, BYPASS_CAPA);
1996                 if (rc != 0 && rc != -ENOENT)
1997                         GOTO(put, rc);
1998         } while (rc == 0);
1999
2000         rc = linkea_data_new(&ldata,
2001                              &lfsck_env_info(env)->lti_linkea_buf);
2002         if (rc != 0)
2003                 GOTO(put, rc);
2004
2005         pname = lfsck_name_get_const(env, name, strlen(name));
2006         rc = linkea_add_buf(&ldata, pname, lfsck_dto2fid(lfsck->li_lpf_obj));
2007         if (rc != 0)
2008                 GOTO(put, rc);
2009
2010         memset(la, 0, sizeof(*la));
2011         la->la_uid = rec->lor_uid;
2012         la->la_gid = rec->lor_gid;
2013         la->la_mode = S_IFREG | S_IRUSR;
2014         la->la_valid = LA_MODE | LA_UID | LA_GID;
2015
2016         memset(dof, 0, sizeof(*dof));
2017         dof->dof_type = dt_mode_to_dft(S_IFREG);
2018
2019         size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2020         if (ea_buf->lb_len < size) {
2021                 lu_buf_realloc(ea_buf, size);
2022                 if (ea_buf->lb_buf == NULL)
2023                         GOTO(put, rc = -ENOMEM);
2024         }
2025
2026         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
2027          *
2028          * XXX: Currently, we do not grab the PDO lock as normal create cases,
2029          *      because creating MDT-object for orphan OST-object is rare, we
2030          *      do not much care about the performance. It can be improved in
2031          *      the future when needed. */
2032         rc = lfsck_ibits_lock(env, lfsck, lfsck->li_lpf_obj, &lh,
2033                               MDS_INODELOCK_UPDATE, LCK_EX);
2034         if (rc != 0)
2035                 GOTO(put, rc);
2036
2037         th = dt_trans_create(env, next);
2038         if (IS_ERR(th))
2039                 GOTO(unlock, rc = PTR_ERR(th));
2040
2041         /* 1a. Update OST-object's parent information remotely.
2042          *
2043          * If other subsequent modifications failed, then next LFSCK scanning
2044          * will process the OST-object as orphan again with known parent FID. */
2045         if (cobj != NULL) {
2046                 rc = dt_declare_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID,
2047                                           0, th);
2048                 if (rc != 0)
2049                         GOTO(stop, rc);
2050         }
2051
2052         /* 2a. Create the MDT-object locally. */
2053         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2054         if (rc != 0)
2055                 GOTO(stop, rc);
2056
2057         /* 3a. Add layout EA for the MDT-object. */
2058         lfsck_buf_init(&lov_buf, ea_buf->lb_buf, size);
2059         rc = dt_declare_xattr_set(env, pobj, &lov_buf, XATTR_NAME_LOV,
2060                                   LU_XATTR_CREATE, th);
2061         if (rc != 0)
2062                 GOTO(stop, rc);
2063
2064         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2065         dtrec->rec_fid = pfid;
2066         dtrec->rec_type = S_IFREG;
2067         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
2068                                (const struct dt_rec *)dtrec,
2069                                (const struct dt_key *)name, th);
2070         if (rc != 0)
2071                 GOTO(stop, rc);
2072
2073         /* 5a. insert linkEA for parent. */
2074         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2075                        ldata.ld_leh->leh_len);
2076         rc = dt_declare_xattr_set(env, pobj, &linkea_buf,
2077                                   XATTR_NAME_LINK, 0, th);
2078         if (rc != 0)
2079                 GOTO(stop, rc);
2080
2081         rc = dt_trans_start(env, next, th);
2082         if (rc != 0)
2083                 GOTO(stop, rc);
2084
2085         /* 1b. Update OST-object's parent information remotely. */
2086         if (cobj != NULL) {
2087                 rc = dt_xattr_set(env, cobj, &pbuf, XATTR_NAME_FID, 0, th,
2088                                   BYPASS_CAPA);
2089                 if (rc != 0)
2090                         GOTO(stop, rc);
2091         }
2092
2093         dt_write_lock(env, pobj, 0);
2094         /* 2b. Create the MDT-object locally. */
2095         rc = dt_create(env, pobj, la, NULL, dof, th);
2096         if (rc == 0)
2097                 /* 3b. Add layout EA for the MDT-object. */
2098                 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
2099                                                &lov_buf, LU_XATTR_CREATE,
2100                                                ltd->ltd_index, ea_off, false);
2101         dt_write_unlock(env, pobj);
2102         if (rc < 0)
2103                 GOTO(stop, rc);
2104
2105         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2106         rc = dt_insert(env, lfsck->li_lpf_obj, (const struct dt_rec *)dtrec,
2107                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2108         if (rc != 0)
2109                 GOTO(stop, rc);
2110
2111         /* 5b. insert linkEA for parent. */
2112         rc = dt_xattr_set(env, pobj, &linkea_buf,
2113                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
2114
2115         GOTO(stop, rc);
2116
2117 stop:
2118         dt_trans_stop(env, next, th);
2119
2120 unlock:
2121         lfsck_ibits_unlock(&lh, LCK_EX);
2122
2123 put:
2124         if (cobj != NULL && !IS_ERR(cobj))
2125                 lu_object_put(env, &cobj->do_lu);
2126         if (pobj != NULL && !IS_ERR(pobj))
2127                 lu_object_put(env, &pobj->do_lu);
2128
2129 log:
2130         if (rc < 0)
2131                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2132                        "recreate the lost MDT-object: parent "DFID
2133                        ", child "DFID", OST-index %u, stripe-index %u, "
2134                        "infix %s, type %s: rc = %d\n",
2135                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2136                        ltd->ltd_index, ea_off, infix, type, rc);
2137
2138         return rc >= 0 ? 1 : rc;
2139 }
2140
2141 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2142                                                    struct lfsck_component *com,
2143                                                    const struct lu_fid *fid,
2144                                                    __u32 index)
2145 {
2146         struct lfsck_thread_info *info  = lfsck_env_info(env);
2147         struct lfsck_request     *lr    = &info->lti_lr;
2148         struct lfsck_instance    *lfsck = com->lc_lfsck;
2149         struct lfsck_tgt_desc    *ltd;
2150         struct ptlrpc_request    *req;
2151         struct lfsck_request     *tmp;
2152         struct obd_export        *exp;
2153         int                       rc    = 0;
2154         ENTRY;
2155
2156         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2157         if (unlikely(ltd == NULL))
2158                 RETURN(-ENXIO);
2159
2160         exp = ltd->ltd_exp;
2161         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2162                 GOTO(put, rc = -EOPNOTSUPP);
2163
2164         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2165         if (req == NULL)
2166                 GOTO(put, rc = -ENOMEM);
2167
2168         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2169         if (rc != 0) {
2170                 ptlrpc_request_free(req);
2171
2172                 GOTO(put, rc);
2173         }
2174
2175         memset(lr, 0, sizeof(*lr));
2176         lr->lr_event = LE_CONDITIONAL_DESTROY;
2177         lr->lr_active = LFSCK_TYPE_LAYOUT;
2178         lr->lr_fid = *fid;
2179
2180         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2181         *tmp = *lr;
2182         ptlrpc_request_set_replen(req);
2183
2184         rc = ptlrpc_queue_wait(req);
2185         ptlrpc_req_finished(req);
2186
2187         GOTO(put, rc);
2188
2189 put:
2190         lfsck_tgt_put(ltd);
2191
2192         return rc;
2193 }
2194
2195 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2196                                                   struct lfsck_component *com,
2197                                                   struct lfsck_request *lr)
2198 {
2199         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2200         struct lu_attr                  *la     = &info->lti_la;
2201         ldlm_policy_data_t              *policy = &info->lti_policy;
2202         struct ldlm_res_id              *resid  = &info->lti_resid;
2203         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2204         struct dt_device                *dev    = lfsck->li_bottom;
2205         struct lu_fid                   *fid    = &lr->lr_fid;
2206         struct dt_object                *obj;
2207         struct thandle                  *th     = NULL;
2208         struct lustre_handle             lh     = { 0 };
2209         __u64                            flags  = 0;
2210         int                              rc     = 0;
2211         ENTRY;
2212
2213         obj = lfsck_object_find_by_dev(env, dev, fid);
2214         if (IS_ERR(obj))
2215                 RETURN(PTR_ERR(obj));
2216
2217         dt_read_lock(env, obj, 0);
2218         if (dt_object_exists(obj) == 0 ||
2219             lfsck_is_dead_obj(obj)) {
2220                 dt_read_unlock(env, obj);
2221
2222                 GOTO(put, rc = -ENOENT);
2223         }
2224
2225         /* Get obj's attr without lock firstly. */
2226         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2227         dt_read_unlock(env, obj);
2228         if (rc != 0)
2229                 GOTO(put, rc);
2230
2231         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2232                 GOTO(put, rc = -ETXTBSY);
2233
2234         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2235         LASSERT(lfsck->li_namespace != NULL);
2236
2237         memset(policy, 0, sizeof(*policy));
2238         policy->l_extent.end = OBD_OBJECT_EOF;
2239         ost_fid_build_resid(fid, resid);
2240         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2241                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2242                                     ldlm_completion_ast, NULL, NULL, 0,
2243                                     LVB_T_NONE, NULL, &lh);
2244         if (rc != ELDLM_OK)
2245                 GOTO(put, rc = -EIO);
2246
2247         dt_write_lock(env, obj, 0);
2248         /* Get obj's attr within lock again. */
2249         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2250         if (rc != 0)
2251                 GOTO(unlock, rc);
2252
2253         if (la->la_ctime != 0)
2254                 GOTO(unlock, rc = -ETXTBSY);
2255
2256         th = dt_trans_create(env, dev);
2257         if (IS_ERR(th))
2258                 GOTO(unlock, rc = PTR_ERR(th));
2259
2260         rc = dt_declare_ref_del(env, obj, th);
2261         if (rc != 0)
2262                 GOTO(stop, rc);
2263
2264         rc = dt_declare_destroy(env, obj, th);
2265         if (rc != 0)
2266                 GOTO(stop, rc);
2267
2268         rc = dt_trans_start_local(env, dev, th);
2269         if (rc != 0)
2270                 GOTO(stop, rc);
2271
2272         rc = dt_ref_del(env, obj, th);
2273         if (rc != 0)
2274                 GOTO(stop, rc);
2275
2276         rc = dt_destroy(env, obj, th);
2277         if (rc == 0)
2278                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2279                        "OST-object "DFID" that was created for reparing "
2280                        "dangling referenced case. But the original missed "
2281                        "OST-object is found now.\n",
2282                        lfsck_lfsck2name(lfsck), PFID(fid));
2283
2284         GOTO(stop, rc);
2285
2286 stop:
2287         dt_trans_stop(env, dev, th);
2288
2289 unlock:
2290         dt_write_unlock(env, obj);
2291         ldlm_lock_decref(&lh, LCK_EX);
2292
2293 put:
2294         lu_object_put(env, &obj->do_lu);
2295
2296         return rc;
2297 }
2298
2299 /**
2300  * Some OST-object has occupied the specified layout EA slot.
2301  * Such OST-object may be generated by the LFSCK when repair
2302  * dangling referenced MDT-object, which can be indicated by
2303  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2304  * is true and such OST-object has not been modified yet, we
2305  * will replace it with the orphan OST-object; otherwise the
2306  * LFSCK will create new MDT-object to reference the orphan.
2307  *
2308  * \retval       +1: repaired
2309  * \retval        0: did nothing
2310  * \retval      -ve: on error
2311  */
2312 static int lfsck_layout_conflict_create(const struct lu_env *env,
2313                                         struct lfsck_component *com,
2314                                         struct lfsck_tgt_desc *ltd,
2315                                         struct lu_orphan_rec *rec,
2316                                         struct dt_object *parent,
2317                                         struct lu_fid *cfid,
2318                                         struct lu_buf *ea_buf,
2319                                         struct lov_ost_data_v1 *slot,
2320                                         __u32 ea_off)
2321 {
2322         struct lfsck_thread_info *info          = lfsck_env_info(env);
2323         struct lu_fid            *cfid2         = &info->lti_fid2;
2324         struct ost_id            *oi            = &info->lti_oi;
2325         char                     *infix         = info->lti_tmpbuf;
2326         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2327         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2328         struct thandle           *th            = NULL;
2329         struct lustre_handle      lh            = { 0 };
2330         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2331         int                       rc            = 0;
2332         ENTRY;
2333
2334         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2335         rc = ostid_to_fid(cfid2, oi, ost_idx2);
2336         if (rc != 0)
2337                 GOTO(out, rc);
2338
2339         /* Hold layout lock on the parent to prevent others to access. */
2340         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2341                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2342                               LCK_EX);
2343         if (rc != 0)
2344                 GOTO(out, rc);
2345
2346         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2347
2348         /* If the conflict OST-obejct is not created for fixing dangling
2349          * referenced MDT-object in former LFSCK check/repair, or it has
2350          * been modified by others, then we cannot destroy it. Re-create
2351          * a new MDT-object for the orphan OST-object. */
2352         if (rc == -ETXTBSY) {
2353                 /* No need the layout lock on the original parent. */
2354                 lfsck_ibits_unlock(&lh, LCK_EX);
2355
2356                 fid_zero(&rec->lor_fid);
2357                 snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2358                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2359                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2360                                                   infix, "C", ea_off);
2361
2362                 RETURN(rc);
2363         }
2364
2365         if (rc != 0 && rc != -ENOENT)
2366                 GOTO(unlock, rc);
2367
2368         th = dt_trans_create(env, dev);
2369         if (IS_ERR(th))
2370                 GOTO(unlock, rc = PTR_ERR(th));
2371
2372         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2373                                   LU_XATTR_REPLACE, th);
2374         if (rc != 0)
2375                 GOTO(stop, rc);
2376
2377         rc = dt_trans_start_local(env, dev, th);
2378         if (rc != 0)
2379                 GOTO(stop, rc);
2380
2381         dt_write_lock(env, parent, 0);
2382         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2383         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2384                                        LU_XATTR_REPLACE, ltd->ltd_index);
2385         dt_write_unlock(env, parent);
2386
2387         GOTO(stop, rc);
2388
2389 stop:
2390         dt_trans_stop(env, dev, th);
2391
2392 unlock:
2393         lfsck_ibits_unlock(&lh, LCK_EX);
2394
2395 out:
2396         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2397                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2398                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2399                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2400                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2401                ea_off, rc);
2402
2403         return rc >= 0 ? 1 : rc;
2404 }
2405
2406 /**
2407  * \retval       +1: repaired
2408  * \retval        0: did nothing
2409  * \retval      -ve: on error
2410  */
2411 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2412                                        struct lfsck_component *com,
2413                                        struct lfsck_tgt_desc *ltd,
2414                                        struct lu_orphan_rec *rec,
2415                                        struct dt_object *parent,
2416                                        struct lu_fid *cfid,
2417                                        __u32 ost_idx, __u32 ea_off)
2418 {
2419         struct lfsck_thread_info *info          = lfsck_env_info(env);
2420         struct lu_buf            *buf           = &info->lti_big_buf;
2421         struct lu_fid            *fid           = &info->lti_fid2;
2422         struct ost_id            *oi            = &info->lti_oi;
2423         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2424         struct dt_device         *dt            = lfsck->li_bottom;
2425         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2426         struct thandle            *handle       = NULL;
2427         size_t                    lovea_size;
2428         struct lov_mds_md_v1     *lmm;
2429         struct lov_ost_data_v1   *objs;
2430         struct lustre_handle      lh            = { 0 };
2431         __u32                     magic;
2432         int                       fl            = 0;
2433         int                       rc            = 0;
2434         int                       rc1;
2435         int                       i;
2436         __u16                     count;
2437         bool                      locked        = false;
2438         ENTRY;
2439
2440         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2441                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2442                               LCK_EX);
2443         if (rc != 0) {
2444                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2445                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2446                        "stripe-index %u: rc = %d\n",
2447                        lfsck_lfsck2name(lfsck), PFID(cfid),
2448                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2449
2450                 RETURN(rc);
2451         }
2452
2453 again:
2454         if (locked) {
2455                 dt_write_unlock(env, parent);
2456                 locked = false;
2457         }
2458
2459         if (handle != NULL) {
2460                 dt_trans_stop(env, dt, handle);
2461                 handle = NULL;
2462         }
2463
2464         if (rc < 0)
2465                 GOTO(unlock_layout, rc);
2466
2467         lovea_size = rc;
2468         if (buf->lb_len < lovea_size) {
2469                 lu_buf_realloc(buf, lovea_size);
2470                 if (buf->lb_buf == NULL)
2471                         GOTO(unlock_layout, rc = -ENOMEM);
2472         }
2473
2474         if (!(bk->lb_param & LPF_DRYRUN)) {
2475                 handle = dt_trans_create(env, dt);
2476                 if (IS_ERR(handle))
2477                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2478
2479                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2480                                           fl, handle);
2481                 if (rc != 0)
2482                         GOTO(stop, rc);
2483
2484                 rc = dt_trans_start_local(env, dt, handle);
2485                 if (rc != 0)
2486                         GOTO(stop, rc);
2487         }
2488
2489         dt_write_lock(env, parent, 0);
2490         locked = true;
2491         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2492         if (rc == -ERANGE) {
2493                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2494                                   BYPASS_CAPA);
2495                 LASSERT(rc != 0);
2496                 goto again;
2497         } else if (rc == -ENODATA || rc == 0) {
2498                 lovea_size = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2499                 /* If the declared is not big enough, re-try. */
2500                 if (buf->lb_len < lovea_size) {
2501                         rc = lovea_size;
2502                         goto again;
2503                 }
2504                 fl = LU_XATTR_CREATE;
2505         } else if (rc < 0) {
2506                 GOTO(unlock_parent, rc);
2507         } else if (unlikely(buf->lb_len == 0)) {
2508                 goto again;
2509         } else {
2510                 fl = LU_XATTR_REPLACE;
2511                 lovea_size = rc;
2512         }
2513
2514         if (fl == LU_XATTR_CREATE) {
2515                 if (bk->lb_param & LPF_DRYRUN)
2516                         GOTO(unlock_parent, rc = 1);
2517
2518                 LASSERT(buf->lb_len >= lovea_size);
2519
2520                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2521                                                buf, fl, ost_idx, ea_off, false);
2522
2523                 GOTO(unlock_parent, rc);
2524         }
2525
2526         lmm = buf->lb_buf;
2527         rc1 = lfsck_layout_verify_header(lmm);
2528
2529         /* If the LOV EA crashed, the rebuild it. */
2530         if (rc1 == -EINVAL) {
2531                 if (bk->lb_param & LPF_DRYRUN)
2532                         GOTO(unlock_parent, rc = 1);
2533
2534                 LASSERT(buf->lb_len >= lovea_size);
2535
2536                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2537                                                buf, fl, ost_idx, ea_off, true);
2538
2539                 GOTO(unlock_parent, rc);
2540         }
2541
2542         /* For other unknown magic/pattern, keep the current LOV EA. */
2543         if (rc1 != 0)
2544                 GOTO(unlock_parent, rc = rc1);
2545
2546         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2547          * been verified in lfsck_layout_verify_header() already. If some
2548          * new magic introduced in the future, then layout LFSCK needs to
2549          * be updated also. */
2550         magic = le32_to_cpu(lmm->lmm_magic);
2551         if (magic == LOV_MAGIC_V1) {
2552                 objs = &lmm->lmm_objects[0];
2553         } else {
2554                 LASSERT(magic == LOV_MAGIC_V3);
2555                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2556         }
2557
2558         count = le16_to_cpu(lmm->lmm_stripe_count);
2559         if (count == 0)
2560                 GOTO(unlock_parent, rc = -EINVAL);
2561         LASSERT(count > 0);
2562
2563         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2564         if (count <= ea_off) {
2565                 if (bk->lb_param & LPF_DRYRUN)
2566                         GOTO(unlock_parent, rc = 1);
2567
2568                 lovea_size = lov_mds_md_size(ea_off + 1, magic);
2569                 /* If the declared is not big enough, re-try. */
2570                 if (buf->lb_len < lovea_size) {
2571                         rc = lovea_size;
2572                         goto again;
2573                 }
2574
2575                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2576                                                buf, fl, ost_idx, ea_off, false);
2577
2578                 GOTO(unlock_parent, rc);
2579         }
2580
2581         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2582
2583         for (i = 0; i < count; i++, objs++) {
2584                 /* The MDT-object was created via lfsck_layout_recover_create()
2585                  * by others before, and we fill the dummy layout EA. */
2586                 if (lovea_slot_is_dummy(objs)) {
2587                         if (i != ea_off)
2588                                 continue;
2589
2590                         if (bk->lb_param & LPF_DRYRUN)
2591                                 GOTO(unlock_parent, rc = 1);
2592
2593                         lmm->lmm_layout_gen =
2594                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2595                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2596                                                        cfid, buf, objs, fl,
2597                                                        ost_idx);
2598
2599                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2600                                "dummy layout slot for "DFID": parent "DFID
2601                                ", OST-index %u, stripe-index %u: rc = %d\n",
2602                                lfsck_lfsck2name(lfsck), PFID(cfid),
2603                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2604
2605                         GOTO(unlock_parent, rc);
2606                 }
2607
2608                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2609                 rc = ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2610                 if (rc != 0) {
2611                         CDEBUG(D_LFSCK, "%s: the parent "DFID" contains "
2612                                "invalid layout EA at the slot %d, index %u\n",
2613                                lfsck_lfsck2name(lfsck),
2614                                PFID(lfsck_dto2fid(parent)), i,
2615                                le32_to_cpu(objs->l_ost_idx));
2616
2617                         GOTO(unlock_parent, rc);
2618                 }
2619
2620                 /* It should be rare case, the slot is there, but the LFSCK
2621                  * does not handle it during the first-phase cycle scanning. */
2622                 if (unlikely(lu_fid_eq(fid, cfid))) {
2623                         if (i == ea_off) {
2624                                 GOTO(unlock_parent, rc = 0);
2625                         } else {
2626                                 /* Rare case that the OST-object index
2627                                  * does not match the parent MDT-object
2628                                  * layout EA. We trust the later one. */
2629                                 if (bk->lb_param & LPF_DRYRUN)
2630                                         GOTO(unlock_parent, rc = 1);
2631
2632                                 dt_write_unlock(env, parent);
2633                                 if (handle != NULL)
2634                                         dt_trans_stop(env, dt, handle);
2635                                 lfsck_ibits_unlock(&lh, LCK_EX);
2636                                 rc = lfsck_layout_update_pfid(env, com, parent,
2637                                                         cfid, ltd->ltd_tgt, i);
2638
2639                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2640                                        "updated OST-object's pfid for "DFID
2641                                        ": parent "DFID", OST-index %u, "
2642                                        "stripe-index %u: rc = %d\n",
2643                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2644                                        PFID(lfsck_dto2fid(parent)),
2645                                        ltd->ltd_index, i, rc);
2646
2647                                 RETURN(rc);
2648                         }
2649                 }
2650         }
2651
2652         /* The MDT-object exists, but related layout EA slot is occupied
2653          * by others. */
2654         if (bk->lb_param & LPF_DRYRUN)
2655                 GOTO(unlock_parent, rc = 1);
2656
2657         dt_write_unlock(env, parent);
2658         if (handle != NULL)
2659                 dt_trans_stop(env, dt, handle);
2660         lfsck_ibits_unlock(&lh, LCK_EX);
2661         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2662                 objs = &lmm->lmm_objects[ea_off];
2663         else
2664                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2665         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2666                                           buf, objs, ea_off);
2667
2668         RETURN(rc);
2669
2670 unlock_parent:
2671         if (locked)
2672                 dt_write_unlock(env, parent);
2673
2674 stop:
2675         if (handle != NULL)
2676                 dt_trans_stop(env, dt, handle);
2677
2678 unlock_layout:
2679         lfsck_ibits_unlock(&lh, LCK_EX);
2680
2681         return rc;
2682 }
2683
2684 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2685                                         struct lfsck_component *com,
2686                                         struct lfsck_tgt_desc *ltd,
2687                                         struct lu_orphan_rec *rec,
2688                                         struct lu_fid *cfid)
2689 {
2690         struct lfsck_layout     *lo     = com->lc_file_ram;
2691         struct lu_fid           *pfid   = &rec->lor_fid;
2692         struct dt_object        *parent = NULL;
2693         __u32                    ea_off = pfid->f_stripe_idx;
2694         int                      rc     = 0;
2695         ENTRY;
2696
2697         if (!fid_is_sane(cfid))
2698                 GOTO(out, rc = -EINVAL);
2699
2700         if (fid_is_zero(pfid)) {
2701                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2702                                                   "", "N", ea_off);
2703                 GOTO(out, rc);
2704         }
2705
2706         pfid->f_ver = 0;
2707         if (!fid_is_sane(pfid))
2708                 GOTO(out, rc = -EINVAL);
2709
2710         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2711         if (IS_ERR(parent))
2712                 GOTO(out, rc = PTR_ERR(parent));
2713
2714         if (unlikely(dt_object_remote(parent) != 0))
2715                 GOTO(put, rc = -EXDEV);
2716
2717         if (dt_object_exists(parent) == 0) {
2718                 lu_object_put(env, &parent->do_lu);
2719                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2720                                                   "", "R", ea_off);
2721                 GOTO(out, rc);
2722         }
2723
2724         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2725                 GOTO(put, rc = -EISDIR);
2726
2727         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2728                                          ltd->ltd_index, ea_off);
2729
2730         GOTO(put, rc);
2731
2732 put:
2733         if (rc <= 0)
2734                 lu_object_put(env, &parent->do_lu);
2735         else
2736                 /* The layout EA is changed, need to be reloaded next time. */
2737                 lu_object_put_nocache(env, &parent->do_lu);
2738
2739 out:
2740         down_write(&com->lc_sem);
2741         com->lc_new_scanned++;
2742         com->lc_new_checked++;
2743         if (rc > 0) {
2744                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2745                 rc = 0;
2746         } else if (rc < 0) {
2747                 lo->ll_objs_failed_phase2++;
2748         }
2749         up_write(&com->lc_sem);
2750
2751         return rc;
2752 }
2753
2754 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2755                                     struct lfsck_component *com,
2756                                     struct lfsck_tgt_desc *ltd)
2757 {
2758         struct lfsck_layout             *lo     = com->lc_file_ram;
2759         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2760         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2761         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2762         struct ost_id                   *oi     = &info->lti_oi;
2763         struct lu_fid                   *fid    = &info->lti_fid;
2764         struct dt_object                *obj;
2765         const struct dt_it_ops          *iops;
2766         struct dt_it                    *di;
2767         int                              rc     = 0;
2768         ENTRY;
2769
2770         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2771                "scanning for OST%04x\n",
2772                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2773
2774         ostid_set_seq(oi, FID_SEQ_IDIF);
2775         ostid_set_id(oi, 0);
2776         rc = ostid_to_fid(fid, oi, ltd->ltd_index);
2777         if (rc != 0)
2778                 GOTO(log, rc);
2779
2780         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2781         if (unlikely(IS_ERR(obj)))
2782                 GOTO(log, rc = PTR_ERR(obj));
2783
2784         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2785         if (rc != 0)
2786                 GOTO(put, rc);
2787
2788         iops = &obj->do_index_ops->dio_it;
2789         di = iops->init(env, obj, 0, BYPASS_CAPA);
2790         if (IS_ERR(di))
2791                 GOTO(put, rc = PTR_ERR(di));
2792
2793         rc = iops->load(env, di, 0);
2794         if (rc == -ESRCH) {
2795                 /* -ESRCH means that the orphan OST-objects rbtree has been
2796                  * cleanup because of the OSS server restart or other errors. */
2797                 lo->ll_flags |= LF_INCOMPLETE;
2798                 GOTO(fini, rc);
2799         }
2800
2801         if (rc == 0)
2802                 rc = iops->next(env, di);
2803         else if (rc > 0)
2804                 rc = 0;
2805
2806         if (rc < 0)
2807                 GOTO(fini, rc);
2808
2809         if (rc > 0)
2810                 GOTO(fini, rc = 0);
2811
2812         do {
2813                 struct dt_key           *key;
2814                 struct lu_orphan_rec    *rec = &info->lti_rec;
2815
2816                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2817                     cfs_fail_val > 0) {
2818                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2819                         struct l_wait_info       lwi;
2820
2821                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2822                                           NULL, NULL);
2823                         l_wait_event(thread->t_ctl_waitq,
2824                                      !thread_is_running(thread),
2825                                      &lwi);
2826                 }
2827
2828                 key = iops->key(env, di);
2829                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2830                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2831                 if (rc == 0)
2832                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2833                                         &com->lc_fid_latest_scanned_phase2);
2834                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2835                         GOTO(fini, rc);
2836
2837                 lfsck_control_speed_by_self(com);
2838                 do {
2839                         rc = iops->next(env, di);
2840                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2841         } while (rc == 0);
2842
2843         GOTO(fini, rc);
2844
2845 fini:
2846         iops->put(env, di);
2847         iops->fini(env, di);
2848 put:
2849         lu_object_put(env, &obj->do_lu);
2850
2851 log:
2852         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2853                "scanning for OST%04x: rc = %d\n",
2854                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2855
2856         return rc > 0 ? 0 : rc;
2857 }
2858
2859 /* For the MDT-object with dangling reference, we need to repare the
2860  * inconsistency according to the LFSCK sponsor's requirement:
2861  *
2862  * 1) Keep the inconsistency there and report the inconsistency case,
2863  *    then give the chance to the application to find related issues,
2864  *    and the users can make the decision about how to handle it with
2865  *    more human knownledge. (by default)
2866  *
2867  * 2) Re-create the missed OST-object with the FID/owner information. */
2868 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2869                                         struct lfsck_component *com,
2870                                         struct lfsck_layout_req *llr,
2871                                         const struct lu_attr *pla)
2872 {
2873         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2874         struct filter_fid               *pfid   = &info->lti_new_pfid;
2875         struct dt_allocation_hint       *hint   = &info->lti_hint;
2876         struct lu_attr                  *cla    = &info->lti_la2;
2877         struct dt_object                *parent = llr->llr_parent->llo_obj;
2878         struct dt_object                *child  = llr->llr_child;
2879         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2880         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2881         struct thandle                  *handle;
2882         struct lu_buf                   *buf;
2883         struct lustre_handle             lh     = { 0 };
2884         int                              rc;
2885         bool                             create;
2886         ENTRY;
2887
2888         if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2889                 create = true;
2890         else
2891                 create = false;
2892
2893         if (!create)
2894                 GOTO(log, rc = 1);
2895
2896         memset(cla, 0, sizeof(*cla));
2897         cla->la_uid = pla->la_uid;
2898         cla->la_gid = pla->la_gid;
2899         cla->la_mode = S_IFREG | 0666;
2900         cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2901                         LA_ATIME | LA_MTIME | LA_CTIME;
2902
2903         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2904                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2905                               LCK_EX);
2906         if (rc != 0)
2907                 GOTO(log, rc);
2908
2909         handle = dt_trans_create(env, dev);
2910         if (IS_ERR(handle))
2911                 GOTO(unlock1, rc = PTR_ERR(handle));
2912
2913         hint->dah_parent = NULL;
2914         hint->dah_mode = 0;
2915         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2916         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2917         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2918          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2919          * parent MDT-object's layout EA. */
2920         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2921         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2922
2923         rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2924         if (rc != 0)
2925                 GOTO(stop, rc);
2926
2927         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2928                                   LU_XATTR_CREATE, handle);
2929         if (rc != 0)
2930                 GOTO(stop, rc);
2931
2932         rc = dt_trans_start(env, dev, handle);
2933         if (rc != 0)
2934                 GOTO(stop, rc);
2935
2936         dt_read_lock(env, parent, 0);
2937         if (unlikely(lfsck_is_dead_obj(parent)))
2938                 GOTO(unlock2, rc = 1);
2939
2940         rc = dt_create(env, child, cla, hint, NULL, handle);
2941         if (rc != 0)
2942                 GOTO(unlock2, rc);
2943
2944         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2945                           handle, BYPASS_CAPA);
2946
2947         GOTO(unlock2, rc);
2948
2949 unlock2:
2950         dt_read_unlock(env, parent);
2951
2952 stop:
2953         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2954
2955 unlock1:
2956         lfsck_ibits_unlock(&lh, LCK_EX);
2957
2958 log:
2959         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2960                "reference for: parent "DFID", child "DFID", OST-index %u, "
2961                "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2962                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2963                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
2964                llr->llr_lov_idx, pla->la_uid, pla->la_gid,
2965                create ? "Create the lost OST-object as required" :
2966                         "Keep the MDT-object there by default", rc);
2967
2968         return rc;
2969 }
2970
2971 /* If the OST-object does not recognize the MDT-object as its parent, and
2972  * there is no other MDT-object claims as its parent, then just trust the
2973  * given MDT-object as its parent. So update the OST-object filter_fid. */
2974 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2975                                               struct lfsck_component *com,
2976                                               struct lfsck_layout_req *llr,
2977                                               const struct lu_attr *pla)
2978 {
2979         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2980         struct filter_fid               *pfid   = &info->lti_new_pfid;
2981         struct lu_attr                  *tla    = &info->lti_la3;
2982         struct dt_object                *parent = llr->llr_parent->llo_obj;
2983         struct dt_object                *child  = llr->llr_child;
2984         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2985         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2986         struct thandle                  *handle;
2987         struct lu_buf                   *buf;
2988         struct lustre_handle             lh     = { 0 };
2989         int                              rc;
2990         ENTRY;
2991
2992         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
2993                               MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR,
2994                               LCK_EX);
2995         if (rc != 0)
2996                 GOTO(log, rc);
2997
2998         handle = dt_trans_create(env, dev);
2999         if (IS_ERR(handle))
3000                 GOTO(unlock1, rc = PTR_ERR(handle));
3001
3002         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
3003         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
3004         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
3005          * MDT-object's FID::f_ver, instead it is the OST-object index in its
3006          * parent MDT-object's layout EA. */
3007         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
3008         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
3009
3010         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
3011         if (rc != 0)
3012                 GOTO(stop, rc);
3013
3014         tla->la_valid = LA_UID | LA_GID;
3015         tla->la_uid = pla->la_uid;
3016         tla->la_gid = pla->la_gid;
3017         rc = dt_declare_attr_set(env, child, tla, handle);
3018         if (rc != 0)
3019                 GOTO(stop, rc);
3020
3021         rc = dt_trans_start(env, dev, handle);
3022         if (rc != 0)
3023                 GOTO(stop, rc);
3024
3025         dt_write_lock(env, parent, 0);
3026         if (unlikely(lfsck_is_dead_obj(parent)))
3027                 GOTO(unlock2, rc = 1);
3028
3029         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
3030                           BYPASS_CAPA);
3031         if (rc != 0)
3032                 GOTO(unlock2, rc);
3033
3034         /* Get the latest parent's owner. */
3035         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3036         if (rc != 0)
3037                 GOTO(unlock2, rc);
3038
3039         tla->la_valid = LA_UID | LA_GID;
3040         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3041
3042         GOTO(unlock2, rc);
3043
3044 unlock2:
3045         dt_write_unlock(env, parent);
3046
3047 stop:
3048         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3049
3050 unlock1:
3051         lfsck_ibits_unlock(&lh, LCK_EX);
3052
3053 log:
3054         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
3055                "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
3056                "stripe-index %u, owner %u/%u: rc = %d\n",
3057                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3058                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3059                pla->la_uid, pla->la_gid, rc);
3060
3061         return rc;
3062 }
3063
3064 /* If there are more than one MDT-objects claim as the OST-object's parent,
3065  * and the OST-object only recognizes one of them, then we need to generate
3066  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
3067 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
3068                                                    struct lfsck_component *com,
3069                                                    struct lfsck_layout_req *llr,
3070                                                    struct lu_attr *la,
3071                                                    struct lu_buf *buf)
3072 {
3073         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3074         struct dt_allocation_hint       *hint   = &info->lti_hint;
3075         struct dt_object_format         *dof    = &info->lti_dof;
3076         struct dt_device                *pdev   = com->lc_lfsck->li_next;
3077         struct ost_id                   *oi     = &info->lti_oi;
3078         struct dt_object                *parent = llr->llr_parent->llo_obj;
3079         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
3080         struct dt_object                *ch