Whamcloud - gitweb
2345dc534935b065111f10bb3e56689f99926ccd
[fs/lustre-release.git] / lustre / lfsck / lfsck_layout.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_layout.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <linux/bitops.h>
37 #include <linux/rbtree.h>
38
39 #include <lustre/lustre_idl.h>
40 #include <lu_object.h>
41 #include <dt_object.h>
42 #include <lustre_linkea.h>
43 #include <lustre_fid.h>
44 #include <lustre_lib.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <md_object.h>
48 #include <obd_class.h>
49
50 #include "lfsck_internal.h"
51
52 #define LFSCK_LAYOUT_MAGIC              0xB173AE14
53
54 static const char lfsck_layout_name[] = "lfsck_layout";
55
56 struct lfsck_layout_seq {
57         struct list_head         lls_list;
58         __u64                    lls_seq;
59         __u64                    lls_lastid;
60         __u64                    lls_lastid_known;
61         struct dt_object        *lls_lastid_obj;
62         unsigned int             lls_dirty:1;
63 };
64
65 struct lfsck_layout_slave_target {
66         /* link into lfsck_layout_slave_data::llsd_master_list. */
67         struct list_head        llst_list;
68         /* The position for next record in the rbtree for iteration. */
69         struct lu_fid           llst_fid;
70         /* Dummy hash for iteration against the rbtree. */
71         __u64                   llst_hash;
72         __u64                   llst_gen;
73         atomic_t                llst_ref;
74         __u32                   llst_index;
75 };
76
77 struct lfsck_layout_slave_data {
78         /* list for lfsck_layout_seq */
79         struct list_head         llsd_seq_list;
80
81         /* list for the masters involve layout verification. */
82         struct list_head         llsd_master_list;
83         spinlock_t               llsd_lock;
84         __u64                    llsd_touch_gen;
85         struct dt_object        *llsd_rb_obj;
86         struct rb_root           llsd_rb_root;
87         rwlock_t                 llsd_rb_lock;
88         unsigned int             llsd_rbtree_valid:1;
89 };
90
91 struct lfsck_layout_object {
92         struct dt_object        *llo_obj;
93         struct lu_attr           llo_attr;
94         atomic_t                 llo_ref;
95         __u16                    llo_gen;
96 };
97
98 struct lfsck_layout_req {
99         struct list_head                 llr_list;
100         struct lfsck_layout_object      *llr_parent;
101         struct dt_object                *llr_child;
102         __u32                            llr_ost_idx;
103         __u32                            llr_lov_idx; /* offset in LOV EA */
104 };
105
106 struct lfsck_layout_master_data {
107         spinlock_t              llmd_lock;
108         struct list_head        llmd_req_list;
109
110         /* list for the ost targets involve layout verification. */
111         struct list_head        llmd_ost_list;
112
113         /* list for the ost targets in phase1 scanning. */
114         struct list_head        llmd_ost_phase1_list;
115
116         /* list for the ost targets in phase1 scanning. */
117         struct list_head        llmd_ost_phase2_list;
118
119         /* list for the mdt targets involve layout verification. */
120         struct list_head        llmd_mdt_list;
121
122         /* list for the mdt targets in phase1 scanning. */
123         struct list_head        llmd_mdt_phase1_list;
124
125         /* list for the mdt targets in phase1 scanning. */
126         struct list_head        llmd_mdt_phase2_list;
127
128         struct ptlrpc_thread    llmd_thread;
129         __u32                   llmd_touch_gen;
130         int                     llmd_prefetched;
131         int                     llmd_assistant_status;
132         int                     llmd_post_result;
133         unsigned int            llmd_to_post:1,
134                                 llmd_to_double_scan:1,
135                                 llmd_in_double_scan:1,
136                                 llmd_exit:1;
137 };
138
139 struct lfsck_layout_slave_async_args {
140         struct obd_export                *llsaa_exp;
141         struct lfsck_component           *llsaa_com;
142         struct lfsck_layout_slave_target *llsaa_llst;
143 };
144
145 static struct lfsck_layout_object *
146 lfsck_layout_object_init(const struct lu_env *env, struct dt_object *obj,
147                          __u16 gen)
148 {
149         struct lfsck_layout_object *llo;
150         int                         rc;
151
152         OBD_ALLOC_PTR(llo);
153         if (llo == NULL)
154                 return ERR_PTR(-ENOMEM);
155
156         rc = dt_attr_get(env, obj, &llo->llo_attr, BYPASS_CAPA);
157         if (rc != 0) {
158                 OBD_FREE_PTR(llo);
159
160                 return ERR_PTR(rc);
161         }
162
163         lu_object_get(&obj->do_lu);
164         llo->llo_obj = obj;
165         /* The gen can be used to check whether some others have changed the
166          * file layout after LFSCK pre-fetching but before real verification. */
167         llo->llo_gen = gen;
168         atomic_set(&llo->llo_ref, 1);
169
170         return llo;
171 }
172
173 static inline void
174 lfsck_layout_llst_put(struct lfsck_layout_slave_target *llst)
175 {
176         if (atomic_dec_and_test(&llst->llst_ref)) {
177                 LASSERT(list_empty(&llst->llst_list));
178
179                 OBD_FREE_PTR(llst);
180         }
181 }
182
183 static inline int
184 lfsck_layout_llst_add(struct lfsck_layout_slave_data *llsd, __u32 index)
185 {
186         struct lfsck_layout_slave_target *llst;
187         struct lfsck_layout_slave_target *tmp;
188         int                               rc   = 0;
189
190         OBD_ALLOC_PTR(llst);
191         if (llst == NULL)
192                 return -ENOMEM;
193
194         INIT_LIST_HEAD(&llst->llst_list);
195         llst->llst_gen = 0;
196         llst->llst_index = index;
197         atomic_set(&llst->llst_ref, 1);
198
199         spin_lock(&llsd->llsd_lock);
200         list_for_each_entry(tmp, &llsd->llsd_master_list, llst_list) {
201                 if (tmp->llst_index == index) {
202                         rc = -EALREADY;
203                         break;
204                 }
205         }
206         if (rc == 0)
207                 list_add_tail(&llst->llst_list, &llsd->llsd_master_list);
208         spin_unlock(&llsd->llsd_lock);
209
210         if (rc != 0)
211                 OBD_FREE_PTR(llst);
212
213         return rc;
214 }
215
216 static inline void
217 lfsck_layout_llst_del(struct lfsck_layout_slave_data *llsd,
218                       struct lfsck_layout_slave_target *llst)
219 {
220         bool del = false;
221
222         spin_lock(&llsd->llsd_lock);
223         if (!list_empty(&llst->llst_list)) {
224                 list_del_init(&llst->llst_list);
225                 del = true;
226         }
227         spin_unlock(&llsd->llsd_lock);
228
229         if (del)
230                 lfsck_layout_llst_put(llst);
231 }
232
233 static inline struct lfsck_layout_slave_target *
234 lfsck_layout_llst_find_and_del(struct lfsck_layout_slave_data *llsd,
235                                __u32 index, bool unlink)
236 {
237         struct lfsck_layout_slave_target *llst;
238
239         spin_lock(&llsd->llsd_lock);
240         list_for_each_entry(llst, &llsd->llsd_master_list, llst_list) {
241                 if (llst->llst_index == index) {
242                         if (unlink)
243                                 list_del_init(&llst->llst_list);
244                         else
245                                 atomic_inc(&llst->llst_ref);
246                         spin_unlock(&llsd->llsd_lock);
247
248                         return llst;
249                 }
250         }
251         spin_unlock(&llsd->llsd_lock);
252
253         return NULL;
254 }
255
256 static inline void lfsck_layout_object_put(const struct lu_env *env,
257                                            struct lfsck_layout_object *llo)
258 {
259         if (atomic_dec_and_test(&llo->llo_ref)) {
260                 lfsck_object_put(env, llo->llo_obj);
261                 OBD_FREE_PTR(llo);
262         }
263 }
264
265 static struct lfsck_layout_req *
266 lfsck_layout_req_init(struct lfsck_layout_object *parent,
267                       struct dt_object *child, __u32 ost_idx, __u32 lov_idx)
268 {
269         struct lfsck_layout_req *llr;
270
271         OBD_ALLOC_PTR(llr);
272         if (llr == NULL)
273                 return ERR_PTR(-ENOMEM);
274
275         INIT_LIST_HEAD(&llr->llr_list);
276         atomic_inc(&parent->llo_ref);
277         llr->llr_parent = parent;
278         llr->llr_child = child;
279         llr->llr_ost_idx = ost_idx;
280         llr->llr_lov_idx = lov_idx;
281
282         return llr;
283 }
284
285 static inline void lfsck_layout_req_fini(const struct lu_env *env,
286                                          struct lfsck_layout_req *llr)
287 {
288         lu_object_put(env, &llr->llr_child->do_lu);
289         lfsck_layout_object_put(env, llr->llr_parent);
290         OBD_FREE_PTR(llr);
291 }
292
293 static inline bool lfsck_layout_req_empty(struct lfsck_layout_master_data *llmd)
294 {
295         bool empty = false;
296
297         spin_lock(&llmd->llmd_lock);
298         if (list_empty(&llmd->llmd_req_list))
299                 empty = true;
300         spin_unlock(&llmd->llmd_lock);
301
302         return empty;
303 }
304
305 static int lfsck_layout_get_lovea(const struct lu_env *env,
306                                   struct dt_object *obj,
307                                   struct lu_buf *buf, ssize_t *buflen)
308 {
309         int rc;
310
311 again:
312         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LOV, BYPASS_CAPA);
313         if (rc == -ERANGE) {
314                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV,
315                                   BYPASS_CAPA);
316                 if (rc <= 0)
317                         return rc;
318
319                 lu_buf_realloc(buf, rc);
320                 if (buflen != NULL)
321                         *buflen = buf->lb_len;
322
323                 if (buf->lb_buf == NULL)
324                         return -ENOMEM;
325
326                 goto again;
327         }
328
329         if (rc == -ENODATA)
330                 rc = 0;
331
332         if (rc <= 0)
333                 return rc;
334
335         if (unlikely(buf->lb_buf == NULL)) {
336                 lu_buf_alloc(buf, rc);
337                 if (buflen != NULL)
338                         *buflen = buf->lb_len;
339
340                 if (buf->lb_buf == NULL)
341                         return -ENOMEM;
342
343                 goto again;
344         }
345
346         return rc;
347 }
348
349 static int lfsck_layout_verify_header(struct lov_mds_md_v1 *lmm)
350 {
351         __u32 magic;
352         __u32 pattern;
353
354         magic = le32_to_cpu(lmm->lmm_magic);
355         /* If magic crashed, keep it there. Sometime later, during OST-object
356          * orphan handling, if some OST-object(s) back-point to it, it can be
357          * verified and repaired. */
358         if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) {
359                 struct ost_id   oi;
360                 int             rc;
361
362                 lmm_oi_cpu_to_le(&oi, &lmm->lmm_oi);
363                 if ((magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC)
364                         rc = -EOPNOTSUPP;
365                 else
366                         rc = -EINVAL;
367
368                 CDEBUG(D_LFSCK, "%s LOV EA magic %u on "DOSTID"\n",
369                        rc == -EINVAL ? "Unknown" : "Unsupported",
370                        magic, POSTID(&oi));
371
372                 return rc;
373         }
374
375         pattern = le32_to_cpu(lmm->lmm_pattern);
376         /* XXX: currently, we only support LOV_PATTERN_RAID0. */
377         if (lov_pattern(pattern) != LOV_PATTERN_RAID0) {
378                 struct ost_id oi;
379
380                 lmm_oi_cpu_to_le(&oi, &lmm->lmm_oi);
381                 CDEBUG(D_LFSCK, "Unsupported LOV EA pattern %u on "DOSTID"\n",
382                        pattern, POSTID(&oi));
383
384                 return -EOPNOTSUPP;
385         }
386
387         return 0;
388 }
389
390 #define LFSCK_RBTREE_BITMAP_SIZE        PAGE_CACHE_SIZE
391 #define LFSCK_RBTREE_BITMAP_WIDTH       (LFSCK_RBTREE_BITMAP_SIZE << 3)
392 #define LFSCK_RBTREE_BITMAP_MASK        (LFSCK_RBTREE_BITMAP_WIDTH - 1)
393
394 struct lfsck_rbtree_node {
395         struct rb_node   lrn_node;
396         __u64            lrn_seq;
397         __u32            lrn_first_oid;
398         atomic_t         lrn_known_count;
399         atomic_t         lrn_accessed_count;
400         void            *lrn_known_bitmap;
401         void            *lrn_accessed_bitmap;
402 };
403
404 static inline int lfsck_rbtree_cmp(struct lfsck_rbtree_node *lrn,
405                                    __u64 seq, __u32 oid)
406 {
407         if (seq < lrn->lrn_seq)
408                 return -1;
409
410         if (seq > lrn->lrn_seq)
411                 return 1;
412
413         if (oid < lrn->lrn_first_oid)
414                 return -1;
415
416         if (oid - lrn->lrn_first_oid >= LFSCK_RBTREE_BITMAP_WIDTH)
417                 return 1;
418
419         return 0;
420 }
421
422 /* The caller should hold llsd->llsd_rb_lock. */
423 static struct lfsck_rbtree_node *
424 lfsck_rbtree_search(struct lfsck_layout_slave_data *llsd,
425                     const struct lu_fid *fid, bool *exact)
426 {
427         struct rb_node           *node  = llsd->llsd_rb_root.rb_node;
428         struct rb_node           *prev  = NULL;
429         struct lfsck_rbtree_node *lrn   = NULL;
430         int                       rc    = 0;
431
432         if (exact != NULL)
433                 *exact = true;
434
435         while (node != NULL) {
436                 prev = node;
437                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
438                 rc = lfsck_rbtree_cmp(lrn, fid_seq(fid), fid_oid(fid));
439                 if (rc < 0)
440                         node = node->rb_left;
441                 else if (rc > 0)
442                         node = node->rb_right;
443                 else
444                         return lrn;
445         }
446
447         if (exact == NULL)
448                 return NULL;
449
450         /* If there is no exactly matched one, then to the next valid one. */
451         *exact = false;
452
453         /* The rbtree is empty. */
454         if (rc == 0)
455                 return NULL;
456
457         if (rc < 0)
458                 return lrn;
459
460         node = rb_next(prev);
461
462         /* The end of the rbtree. */
463         if (node == NULL)
464                 return NULL;
465
466         lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
467
468         return lrn;
469 }
470
471 static struct lfsck_rbtree_node *lfsck_rbtree_new(const struct lu_env *env,
472                                                   const struct lu_fid *fid)
473 {
474         struct lfsck_rbtree_node *lrn;
475
476         OBD_ALLOC_PTR(lrn);
477         if (lrn == NULL)
478                 return ERR_PTR(-ENOMEM);
479
480         OBD_ALLOC(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
481         if (lrn->lrn_known_bitmap == NULL) {
482                 OBD_FREE_PTR(lrn);
483
484                 return ERR_PTR(-ENOMEM);
485         }
486
487         OBD_ALLOC(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
488         if (lrn->lrn_accessed_bitmap == NULL) {
489                 OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
490                 OBD_FREE_PTR(lrn);
491
492                 return ERR_PTR(-ENOMEM);
493         }
494
495         RB_CLEAR_NODE(&lrn->lrn_node);
496         lrn->lrn_seq = fid_seq(fid);
497         lrn->lrn_first_oid = fid_oid(fid) & ~LFSCK_RBTREE_BITMAP_MASK;
498         atomic_set(&lrn->lrn_known_count, 0);
499         atomic_set(&lrn->lrn_accessed_count, 0);
500
501         return lrn;
502 }
503
504 static void lfsck_rbtree_free(struct lfsck_rbtree_node *lrn)
505 {
506         OBD_FREE(lrn->lrn_accessed_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
507         OBD_FREE(lrn->lrn_known_bitmap, LFSCK_RBTREE_BITMAP_SIZE);
508         OBD_FREE_PTR(lrn);
509 }
510
511 /* The caller should hold lock. */
512 static struct lfsck_rbtree_node *
513 lfsck_rbtree_insert(struct lfsck_layout_slave_data *llsd,
514                     struct lfsck_rbtree_node *lrn)
515 {
516         struct rb_node           **pos    = &llsd->llsd_rb_root.rb_node;
517         struct rb_node            *parent = NULL;
518         struct lfsck_rbtree_node  *tmp;
519         int                        rc;
520
521         while (*pos != NULL) {
522                 parent = *pos;
523                 tmp = rb_entry(parent, struct lfsck_rbtree_node, lrn_node);
524                 rc = lfsck_rbtree_cmp(tmp, lrn->lrn_seq, lrn->lrn_first_oid);
525                 if (rc < 0)
526                         pos = &(*pos)->rb_left;
527                 else if (rc > 0)
528                         pos = &(*pos)->rb_right;
529                 else
530                         return tmp;
531         }
532
533         rb_link_node(&lrn->lrn_node, parent, pos);
534         rb_insert_color(&lrn->lrn_node, &llsd->llsd_rb_root);
535
536         return lrn;
537 }
538
539 extern const struct dt_index_operations lfsck_orphan_index_ops;
540
541 static int lfsck_rbtree_setup(const struct lu_env *env,
542                               struct lfsck_component *com)
543 {
544         struct lu_fid                   *fid    = &lfsck_env_info(env)->lti_fid;
545         struct lfsck_instance           *lfsck  = com->lc_lfsck;
546         struct dt_device                *dev    = lfsck->li_bottom;
547         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
548         struct dt_object                *obj;
549
550         fid->f_seq = FID_SEQ_LAYOUT_RBTREE;
551         fid->f_oid = lfsck_dev_idx(dev);
552         fid->f_ver = 0;
553         obj = dt_locate(env, dev, fid);
554         if (IS_ERR(obj))
555                 RETURN(PTR_ERR(obj));
556
557         /* Generate an in-RAM object to stand for the layout rbtree.
558          * Scanning the layout rbtree will be via the iteration over
559          * the object. In the future, the rbtree may be written onto
560          * disk with the object.
561          *
562          * Mark the object to be as exist. */
563         obj->do_lu.lo_header->loh_attr |= LOHA_EXISTS;
564         obj->do_index_ops = &lfsck_orphan_index_ops;
565         llsd->llsd_rb_obj = obj;
566         llsd->llsd_rbtree_valid = 1;
567         dev->dd_record_fid_accessed = 1;
568
569         CDEBUG(D_LFSCK, "%s: layout LFSCK init OST-objects accessing bitmap\n",
570                lfsck_lfsck2name(lfsck));
571
572         return 0;
573 }
574
575 static void lfsck_rbtree_cleanup(const struct lu_env *env,
576                                  struct lfsck_component *com)
577 {
578         struct lfsck_instance           *lfsck = com->lc_lfsck;
579         struct lfsck_layout_slave_data  *llsd  = com->lc_data;
580         struct rb_node                  *node  = rb_first(&llsd->llsd_rb_root);
581         struct rb_node                  *next;
582         struct lfsck_rbtree_node        *lrn;
583
584         lfsck->li_bottom->dd_record_fid_accessed = 0;
585         /* Invalid the rbtree, then no others will use it. */
586         write_lock(&llsd->llsd_rb_lock);
587         llsd->llsd_rbtree_valid = 0;
588         write_unlock(&llsd->llsd_rb_lock);
589
590         while (node != NULL) {
591                 next = rb_next(node);
592                 lrn = rb_entry(node, struct lfsck_rbtree_node, lrn_node);
593                 rb_erase(node, &llsd->llsd_rb_root);
594                 lfsck_rbtree_free(lrn);
595                 node = next;
596         }
597
598         if (llsd->llsd_rb_obj != NULL) {
599                 lu_object_put(env, &llsd->llsd_rb_obj->do_lu);
600                 llsd->llsd_rb_obj = NULL;
601         }
602
603         CDEBUG(D_LFSCK, "%s: layout LFSCK fini OST-objects accessing bitmap\n",
604                lfsck_lfsck2name(lfsck));
605 }
606
607 static void lfsck_rbtree_update_bitmap(const struct lu_env *env,
608                                        struct lfsck_component *com,
609                                        const struct lu_fid *fid,
610                                        bool accessed)
611 {
612         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
613         struct lfsck_rbtree_node        *lrn;
614         bool                             insert = false;
615         int                              idx;
616         int                              rc     = 0;
617         ENTRY;
618
619         if (unlikely(!fid_is_sane(fid) || fid_is_last_id(fid)))
620                 RETURN_EXIT;
621
622         if (!fid_is_idif(fid) && !fid_is_norm(fid))
623                 RETURN_EXIT;
624
625         read_lock(&llsd->llsd_rb_lock);
626         if (!llsd->llsd_rbtree_valid)
627                 GOTO(unlock, rc = 0);
628
629         lrn = lfsck_rbtree_search(llsd, fid, NULL);
630         if (lrn == NULL) {
631                 struct lfsck_rbtree_node *tmp;
632
633                 LASSERT(!insert);
634
635                 read_unlock(&llsd->llsd_rb_lock);
636                 tmp = lfsck_rbtree_new(env, fid);
637                 if (IS_ERR(tmp))
638                         GOTO(out, rc = PTR_ERR(tmp));
639
640                 insert = true;
641                 write_lock(&llsd->llsd_rb_lock);
642                 if (!llsd->llsd_rbtree_valid) {
643                         lfsck_rbtree_free(tmp);
644                         GOTO(unlock, rc = 0);
645                 }
646
647                 lrn = lfsck_rbtree_insert(llsd, tmp);
648                 if (lrn != tmp)
649                         lfsck_rbtree_free(tmp);
650         }
651
652         idx = fid_oid(fid) & LFSCK_RBTREE_BITMAP_MASK;
653         /* Any accessed object must be a known object. */
654         if (!test_and_set_bit(idx, lrn->lrn_known_bitmap))
655                 atomic_inc(&lrn->lrn_known_count);
656         if (accessed && !test_and_set_bit(idx, lrn->lrn_accessed_bitmap))
657                 atomic_inc(&lrn->lrn_accessed_count);
658
659         GOTO(unlock, rc = 0);
660
661 unlock:
662         if (insert)
663                 write_unlock(&llsd->llsd_rb_lock);
664         else
665                 read_unlock(&llsd->llsd_rb_lock);
666 out:
667         if (rc != 0 && accessed) {
668                 struct lfsck_layout *lo = com->lc_file_ram;
669
670                 CDEBUG(D_LFSCK, "%s: fail to update OST-objects accessing "
671                        "bitmap, and will cause incorrect LFSCK OST-object "
672                        "handling, so disable it to cancel orphan handling "
673                        "for related device. rc = %d\n",
674                        lfsck_lfsck2name(com->lc_lfsck), rc);
675
676                 lo->ll_flags |= LF_INCOMPLETE;
677                 lfsck_rbtree_cleanup(env, com);
678         }
679 }
680
681 static void lfsck_layout_le_to_cpu(struct lfsck_layout *des,
682                                    const struct lfsck_layout *src)
683 {
684         int i;
685
686         des->ll_magic = le32_to_cpu(src->ll_magic);
687         des->ll_status = le32_to_cpu(src->ll_status);
688         des->ll_flags = le32_to_cpu(src->ll_flags);
689         des->ll_success_count = le32_to_cpu(src->ll_success_count);
690         des->ll_run_time_phase1 = le32_to_cpu(src->ll_run_time_phase1);
691         des->ll_run_time_phase2 = le32_to_cpu(src->ll_run_time_phase2);
692         des->ll_time_last_complete = le64_to_cpu(src->ll_time_last_complete);
693         des->ll_time_latest_start = le64_to_cpu(src->ll_time_latest_start);
694         des->ll_time_last_checkpoint =
695                                 le64_to_cpu(src->ll_time_last_checkpoint);
696         des->ll_pos_latest_start = le64_to_cpu(src->ll_pos_latest_start);
697         des->ll_pos_last_checkpoint = le64_to_cpu(src->ll_pos_last_checkpoint);
698         des->ll_pos_first_inconsistent =
699                         le64_to_cpu(src->ll_pos_first_inconsistent);
700         des->ll_objs_checked_phase1 = le64_to_cpu(src->ll_objs_checked_phase1);
701         des->ll_objs_failed_phase1 = le64_to_cpu(src->ll_objs_failed_phase1);
702         des->ll_objs_checked_phase2 = le64_to_cpu(src->ll_objs_checked_phase2);
703         des->ll_objs_failed_phase2 = le64_to_cpu(src->ll_objs_failed_phase2);
704         for (i = 0; i < LLIT_MAX; i++)
705                 des->ll_objs_repaired[i] =
706                                 le64_to_cpu(src->ll_objs_repaired[i]);
707         des->ll_objs_skipped = le64_to_cpu(src->ll_objs_skipped);
708 }
709
710 static void lfsck_layout_cpu_to_le(struct lfsck_layout *des,
711                                    const struct lfsck_layout *src)
712 {
713         int i;
714
715         des->ll_magic = cpu_to_le32(src->ll_magic);
716         des->ll_status = cpu_to_le32(src->ll_status);
717         des->ll_flags = cpu_to_le32(src->ll_flags);
718         des->ll_success_count = cpu_to_le32(src->ll_success_count);
719         des->ll_run_time_phase1 = cpu_to_le32(src->ll_run_time_phase1);
720         des->ll_run_time_phase2 = cpu_to_le32(src->ll_run_time_phase2);
721         des->ll_time_last_complete = cpu_to_le64(src->ll_time_last_complete);
722         des->ll_time_latest_start = cpu_to_le64(src->ll_time_latest_start);
723         des->ll_time_last_checkpoint =
724                                 cpu_to_le64(src->ll_time_last_checkpoint);
725         des->ll_pos_latest_start = cpu_to_le64(src->ll_pos_latest_start);
726         des->ll_pos_last_checkpoint = cpu_to_le64(src->ll_pos_last_checkpoint);
727         des->ll_pos_first_inconsistent =
728                         cpu_to_le64(src->ll_pos_first_inconsistent);
729         des->ll_objs_checked_phase1 = cpu_to_le64(src->ll_objs_checked_phase1);
730         des->ll_objs_failed_phase1 = cpu_to_le64(src->ll_objs_failed_phase1);
731         des->ll_objs_checked_phase2 = cpu_to_le64(src->ll_objs_checked_phase2);
732         des->ll_objs_failed_phase2 = cpu_to_le64(src->ll_objs_failed_phase2);
733         for (i = 0; i < LLIT_MAX; i++)
734                 des->ll_objs_repaired[i] =
735                                 cpu_to_le64(src->ll_objs_repaired[i]);
736         des->ll_objs_skipped = cpu_to_le64(src->ll_objs_skipped);
737 }
738
739 /**
740  * \retval +ve: the lfsck_layout is broken, the caller should reset it.
741  * \retval 0: succeed.
742  * \retval -ve: failed cases.
743  */
744 static int lfsck_layout_load(const struct lu_env *env,
745                              struct lfsck_component *com)
746 {
747         struct lfsck_layout             *lo     = com->lc_file_ram;
748         const struct dt_body_operations *dbo    = com->lc_obj->do_body_ops;
749         ssize_t                          size   = com->lc_file_size;
750         loff_t                           pos    = 0;
751         int                              rc;
752
753         rc = dbo->dbo_read(env, com->lc_obj,
754                            lfsck_buf_get(env, com->lc_file_disk, size), &pos,
755                            BYPASS_CAPA);
756         if (rc == 0) {
757                 return -ENOENT;
758         } else if (rc < 0) {
759                 CDEBUG(D_LFSCK, "%s: failed to load lfsck_layout: rc = %d\n",
760                        lfsck_lfsck2name(com->lc_lfsck), rc);
761                 return rc;
762         } else if (rc != size) {
763                 CDEBUG(D_LFSCK, "%s: lfsck_layout size %u != %u; reset it\n",
764                        lfsck_lfsck2name(com->lc_lfsck), rc, (unsigned int)size);
765                 return 1;
766         }
767
768         lfsck_layout_le_to_cpu(lo, com->lc_file_disk);
769         if (lo->ll_magic != LFSCK_LAYOUT_MAGIC) {
770                 CDEBUG(D_LFSCK, "%s: invalid lfsck_layout magic %#x != %#x, "
771                        "to be reset\n", lfsck_lfsck2name(com->lc_lfsck),
772                        lo->ll_magic, LFSCK_LAYOUT_MAGIC);
773                 return 1;
774         }
775
776         return 0;
777 }
778
779 static int lfsck_layout_store(const struct lu_env *env,
780                               struct lfsck_component *com)
781 {
782         struct dt_object         *obj           = com->lc_obj;
783         struct lfsck_instance    *lfsck         = com->lc_lfsck;
784         struct lfsck_layout      *lo            = com->lc_file_disk;
785         struct thandle           *handle;
786         ssize_t                   size          = com->lc_file_size;
787         loff_t                    pos           = 0;
788         int                       rc;
789         ENTRY;
790
791         lfsck_layout_cpu_to_le(lo, com->lc_file_ram);
792         handle = dt_trans_create(env, lfsck->li_bottom);
793         if (IS_ERR(handle))
794                 GOTO(log, rc = PTR_ERR(handle));
795
796         rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
797                                      pos, handle);
798         if (rc != 0)
799                 GOTO(out, rc);
800
801         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
802         if (rc != 0)
803                 GOTO(out, rc);
804
805         rc = dt_record_write(env, obj, lfsck_buf_get(env, lo, size), &pos,
806                              handle);
807
808         GOTO(out, rc);
809
810 out:
811         dt_trans_stop(env, lfsck->li_bottom, handle);
812
813 log:
814         if (rc != 0)
815                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_layout: rc = %d\n",
816                        lfsck_lfsck2name(lfsck), rc);
817         return rc;
818 }
819
820 static int lfsck_layout_init(const struct lu_env *env,
821                              struct lfsck_component *com)
822 {
823         struct lfsck_layout *lo = com->lc_file_ram;
824         int rc;
825
826         memset(lo, 0, com->lc_file_size);
827         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
828         lo->ll_status = LS_INIT;
829         down_write(&com->lc_sem);
830         rc = lfsck_layout_store(env, com);
831         up_write(&com->lc_sem);
832
833         return rc;
834 }
835
836 static int fid_is_for_ostobj(const struct lu_env *env, struct dt_device *dt,
837                              struct dt_object *obj, const struct lu_fid *fid)
838 {
839         struct seq_server_site  *ss     = lu_site2seq(dt->dd_lu_dev.ld_site);
840         struct lu_seq_range      range  = { 0 };
841         struct lustre_mdt_attrs *lma;
842         int                      rc;
843
844         fld_range_set_any(&range);
845         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), &range);
846         if (rc == 0) {
847                 if (fld_range_is_ost(&range))
848                         return 1;
849
850                 return 0;
851         }
852
853         lma = &lfsck_env_info(env)->lti_lma;
854         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, lma, sizeof(*lma)),
855                           XATTR_NAME_LMA, BYPASS_CAPA);
856         if (rc == sizeof(*lma)) {
857                 lustre_lma_swab(lma);
858
859                 return lma->lma_compat & LMAC_FID_ON_OST ? 1 : 0;
860         }
861
862         rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_FID, BYPASS_CAPA);
863
864         return rc > 0;
865 }
866
867 static struct lfsck_layout_seq *
868 lfsck_layout_seq_lookup(struct lfsck_layout_slave_data *llsd, __u64 seq)
869 {
870         struct lfsck_layout_seq *lls;
871
872         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
873                 if (lls->lls_seq == seq)
874                         return lls;
875
876                 if (lls->lls_seq > seq)
877                         return NULL;
878         }
879
880         return NULL;
881 }
882
883 static void
884 lfsck_layout_seq_insert(struct lfsck_layout_slave_data *llsd,
885                         struct lfsck_layout_seq *lls)
886 {
887         struct lfsck_layout_seq *tmp;
888         struct list_head        *pos = &llsd->llsd_seq_list;
889
890         list_for_each_entry(tmp, &llsd->llsd_seq_list, lls_list) {
891                 if (lls->lls_seq < tmp->lls_seq) {
892                         pos = &tmp->lls_list;
893                         break;
894                 }
895         }
896         list_add_tail(&lls->lls_list, pos);
897 }
898
899 static int
900 lfsck_layout_lastid_create(const struct lu_env *env,
901                            struct lfsck_instance *lfsck,
902                            struct dt_object *obj)
903 {
904         struct lfsck_thread_info *info   = lfsck_env_info(env);
905         struct lu_attr           *la     = &info->lti_la;
906         struct dt_object_format  *dof    = &info->lti_dof;
907         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
908         struct dt_device         *dt     = lfsck->li_bottom;
909         struct thandle           *th;
910         __u64                     lastid = 0;
911         loff_t                    pos    = 0;
912         int                       rc;
913         ENTRY;
914
915         if (bk->lb_param & LPF_DRYRUN)
916                 return 0;
917
918         memset(la, 0, sizeof(*la));
919         la->la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
920         la->la_valid = LA_MODE | LA_UID | LA_GID;
921         dof->dof_type = dt_mode_to_dft(S_IFREG);
922
923         th = dt_trans_create(env, dt);
924         if (IS_ERR(th))
925                 GOTO(log, rc = PTR_ERR(th));
926
927         rc = dt_declare_create(env, obj, la, NULL, dof, th);
928         if (rc != 0)
929                 GOTO(stop, rc);
930
931         rc = dt_declare_record_write(env, obj,
932                                      lfsck_buf_get(env, &lastid,
933                                                    sizeof(lastid)),
934                                      pos, th);
935         if (rc != 0)
936                 GOTO(stop, rc);
937
938         rc = dt_trans_start_local(env, dt, th);
939         if (rc != 0)
940                 GOTO(stop, rc);
941
942         dt_write_lock(env, obj, 0);
943         if (likely(!dt_object_exists(obj))) {
944                 rc = dt_create(env, obj, la, NULL, dof, th);
945                 if (rc == 0)
946                         rc = dt_record_write(env, obj,
947                                 lfsck_buf_get(env, &lastid, sizeof(lastid)),
948                                 &pos, th);
949         }
950         dt_write_unlock(env, obj);
951
952         GOTO(stop, rc);
953
954 stop:
955         dt_trans_stop(env, dt, th);
956
957 log:
958         CDEBUG(D_LFSCK, "%s: layout LFSCK will create LAST_ID for <seq> "
959                LPX64": rc = %d\n",
960                lfsck_lfsck2name(lfsck), fid_seq(lfsck_dto2fid(obj)), rc);
961
962         return rc;
963 }
964
965 static int
966 lfsck_layout_lastid_reload(const struct lu_env *env,
967                            struct lfsck_component *com,
968                            struct lfsck_layout_seq *lls)
969 {
970         __u64   lastid;
971         loff_t  pos     = 0;
972         int     rc;
973
974         dt_read_lock(env, lls->lls_lastid_obj, 0);
975         rc = dt_record_read(env, lls->lls_lastid_obj,
976                             lfsck_buf_get(env, &lastid, sizeof(lastid)), &pos);
977         dt_read_unlock(env, lls->lls_lastid_obj);
978         if (unlikely(rc != 0))
979                 return rc;
980
981         lastid = le64_to_cpu(lastid);
982         if (lastid < lls->lls_lastid_known) {
983                 struct lfsck_instance   *lfsck  = com->lc_lfsck;
984                 struct lfsck_layout     *lo     = com->lc_file_ram;
985
986                 lls->lls_lastid = lls->lls_lastid_known;
987                 lls->lls_dirty = 1;
988                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
989                         LASSERT(lfsck->li_out_notify != NULL);
990
991                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
992                                              LE_LASTID_REBUILDING);
993                         lo->ll_flags |= LF_CRASHED_LASTID;
994                 }
995         } else if (lastid >= lls->lls_lastid) {
996                 lls->lls_lastid = lastid;
997                 lls->lls_dirty = 0;
998         }
999
1000         return 0;
1001 }
1002
1003 static int
1004 lfsck_layout_lastid_store(const struct lu_env *env,
1005                           struct lfsck_component *com)
1006 {
1007         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1008         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
1009         struct dt_device                *dt     = lfsck->li_bottom;
1010         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
1011         struct lfsck_layout_seq         *lls;
1012         struct thandle                  *th;
1013         __u64                            lastid;
1014         int                              rc     = 0;
1015         int                              rc1    = 0;
1016
1017         list_for_each_entry(lls, &llsd->llsd_seq_list, lls_list) {
1018                 loff_t pos = 0;
1019
1020                 /* XXX: Add the code back if we really found related
1021                  *      inconsistent cases in the future. */
1022 #if 0
1023                 if (!lls->lls_dirty) {
1024                         /* In OFD, before the pre-creation, the LAST_ID
1025                          * file will be updated firstly, which may hide
1026                          * some potential crashed cases. For example:
1027                          *
1028                          * The old obj1's ID is higher than old LAST_ID
1029                          * but lower than the new LAST_ID, but the LFSCK
1030                          * have not touch the obj1 until the OFD updated
1031                          * the LAST_ID. So the LFSCK does not regard it
1032                          * as crashed case. But when OFD does not create
1033                          * successfully, it will set the LAST_ID as the
1034                          * real created objects' ID, then LFSCK needs to
1035                          * found related inconsistency. */
1036                         rc = lfsck_layout_lastid_reload(env, com, lls);
1037                         if (likely(!lls->lls_dirty))
1038                                 continue;
1039                 }
1040 #endif
1041
1042                 CDEBUG(D_LFSCK, "%s: layout LFSCK will sync the LAST_ID for "
1043                        "<seq> "LPX64" as <oid> "LPU64"\n",
1044                        lfsck_lfsck2name(lfsck), lls->lls_seq, lls->lls_lastid);
1045
1046                 if (bk->lb_param & LPF_DRYRUN) {
1047                         lls->lls_dirty = 0;
1048                         continue;
1049                 }
1050
1051                 th = dt_trans_create(env, dt);
1052                 if (IS_ERR(th)) {
1053                         rc1 = PTR_ERR(th);
1054                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1055                                "the LAST_ID for <seq> "LPX64"(1): rc = %d\n",
1056                                lfsck_lfsck2name(com->lc_lfsck),
1057                                lls->lls_seq, rc1);
1058                         continue;
1059                 }
1060
1061                 lastid = cpu_to_le64(lls->lls_lastid);
1062                 rc = dt_declare_record_write(env, lls->lls_lastid_obj,
1063                                              lfsck_buf_get(env, &lastid,
1064                                                            sizeof(lastid)),
1065                                              pos, th);
1066                 if (rc != 0)
1067                         goto stop;
1068
1069                 rc = dt_trans_start_local(env, dt, th);
1070                 if (rc != 0)
1071                         goto stop;
1072
1073                 dt_write_lock(env, lls->lls_lastid_obj, 0);
1074                 rc = dt_record_write(env, lls->lls_lastid_obj,
1075                                      lfsck_buf_get(env, &lastid,
1076                                      sizeof(lastid)), &pos, th);
1077                 dt_write_unlock(env, lls->lls_lastid_obj);
1078                 if (rc == 0)
1079                         lls->lls_dirty = 0;
1080
1081 stop:
1082                 dt_trans_stop(env, dt, th);
1083                 if (rc != 0) {
1084                         rc1 = rc;
1085                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to store "
1086                                "the LAST_ID for <seq> "LPX64"(2): rc = %d\n",
1087                                lfsck_lfsck2name(com->lc_lfsck),
1088                                lls->lls_seq, rc1);
1089                 }
1090         }
1091
1092         return rc1;
1093 }
1094
1095 static int
1096 lfsck_layout_lastid_load(const struct lu_env *env,
1097                          struct lfsck_component *com,
1098                          struct lfsck_layout_seq *lls)
1099 {
1100         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1101         struct lfsck_layout     *lo     = com->lc_file_ram;
1102         struct lu_fid           *fid    = &lfsck_env_info(env)->lti_fid;
1103         struct dt_object        *obj;
1104         loff_t                   pos    = 0;
1105         int                      rc;
1106         ENTRY;
1107
1108         lu_last_id_fid(fid, lls->lls_seq, lfsck_dev_idx(lfsck->li_bottom));
1109         obj = dt_locate(env, lfsck->li_bottom, fid);
1110         if (IS_ERR(obj))
1111                 RETURN(PTR_ERR(obj));
1112
1113         /* LAST_ID crashed, to be rebuilt */
1114         if (!dt_object_exists(obj)) {
1115                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
1116                         LASSERT(lfsck->li_out_notify != NULL);
1117
1118                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1119                                              LE_LASTID_REBUILDING);
1120                         lo->ll_flags |= LF_CRASHED_LASTID;
1121
1122                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY4) &&
1123                             cfs_fail_val > 0) {
1124                                 struct l_wait_info lwi = LWI_TIMEOUT(
1125                                                 cfs_time_seconds(cfs_fail_val),
1126                                                 NULL, NULL);
1127
1128                                 up_write(&com->lc_sem);
1129                                 l_wait_event(lfsck->li_thread.t_ctl_waitq,
1130                                              !thread_is_running(&lfsck->li_thread),
1131                                              &lwi);
1132                                 down_write(&com->lc_sem);
1133                         }
1134                 }
1135
1136                 rc = lfsck_layout_lastid_create(env, lfsck, obj);
1137         } else {
1138                 dt_read_lock(env, obj, 0);
1139                 rc = dt_read(env, obj,
1140                         lfsck_buf_get(env, &lls->lls_lastid, sizeof(__u64)),
1141                         &pos);
1142                 dt_read_unlock(env, obj);
1143                 if (rc != 0 && rc != sizeof(__u64))
1144                         GOTO(out, rc = (rc > 0 ? -EFAULT : rc));
1145
1146                 if (rc == 0 && !(lo->ll_flags & LF_CRASHED_LASTID)) {
1147                         LASSERT(lfsck->li_out_notify != NULL);
1148
1149                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
1150                                              LE_LASTID_REBUILDING);
1151                         lo->ll_flags |= LF_CRASHED_LASTID;
1152                 }
1153
1154                 lls->lls_lastid = le64_to_cpu(lls->lls_lastid);
1155                 rc = 0;
1156         }
1157
1158         GOTO(out, rc);
1159
1160 out:
1161         if (rc != 0)
1162                 lfsck_object_put(env, obj);
1163         else
1164                 lls->lls_lastid_obj = obj;
1165
1166         return rc;
1167 }
1168
1169 static void lfsck_layout_record_failure(const struct lu_env *env,
1170                                                  struct lfsck_instance *lfsck,
1171                                                  struct lfsck_layout *lo)
1172 {
1173         lo->ll_objs_failed_phase1++;
1174         if (unlikely(lo->ll_pos_first_inconsistent == 0)) {
1175                 lo->ll_pos_first_inconsistent =
1176                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
1177                                                         lfsck->li_di_oit);
1178
1179                 CDEBUG(D_LFSCK, "%s: layout LFSCK hit first non-repaired "
1180                        "inconsistency at the pos ["LPU64"]\n",
1181                        lfsck_lfsck2name(lfsck),
1182                        lo->ll_pos_first_inconsistent);
1183         }
1184 }
1185
1186 static int lfsck_layout_master_async_interpret(const struct lu_env *env,
1187                                                struct ptlrpc_request *req,
1188                                                void *args, int rc)
1189 {
1190         struct lfsck_async_interpret_args *laia = args;
1191         struct lfsck_component            *com  = laia->laia_com;
1192         struct lfsck_layout_master_data   *llmd = com->lc_data;
1193         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1194         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1195         struct lfsck_request              *lr   = laia->laia_lr;
1196
1197         switch (lr->lr_event) {
1198         case LE_START:
1199                 if (rc != 0) {
1200                         struct lfsck_layout *lo = com->lc_file_ram;
1201
1202                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout "
1203                                "start: rc = %d\n",
1204                                lfsck_lfsck2name(com->lc_lfsck),
1205                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1206                                ltd->ltd_index, rc);
1207                         lo->ll_flags |= LF_INCOMPLETE;
1208                         break;
1209                 }
1210
1211                 spin_lock(&ltds->ltd_lock);
1212                 if (ltd->ltd_dead || ltd->ltd_layout_done) {
1213                         spin_unlock(&ltds->ltd_lock);
1214                         break;
1215                 }
1216
1217                 if (lr->lr_flags & LEF_TO_OST) {
1218                         if (list_empty(&ltd->ltd_layout_list))
1219                                 list_add_tail(&ltd->ltd_layout_list,
1220                                               &llmd->llmd_ost_list);
1221                         if (list_empty(&ltd->ltd_layout_phase_list))
1222                                 list_add_tail(&ltd->ltd_layout_phase_list,
1223                                               &llmd->llmd_ost_phase1_list);
1224                 } else {
1225                         if (list_empty(&ltd->ltd_layout_list))
1226                                 list_add_tail(&ltd->ltd_layout_list,
1227                                               &llmd->llmd_mdt_list);
1228                         if (list_empty(&ltd->ltd_layout_phase_list))
1229                                 list_add_tail(&ltd->ltd_layout_phase_list,
1230                                               &llmd->llmd_mdt_phase1_list);
1231                 }
1232                 spin_unlock(&ltds->ltd_lock);
1233                 break;
1234         case LE_STOP:
1235         case LE_PHASE1_DONE:
1236         case LE_PHASE2_DONE:
1237         case LE_PEER_EXIT:
1238                 if (rc != 0 && rc != -EALREADY)
1239                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for layout: "
1240                                "event = %d, rc = %d\n",
1241                                lfsck_lfsck2name(com->lc_lfsck),
1242                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1243                                ltd->ltd_index, lr->lr_event, rc);
1244                 break;
1245         case LE_QUERY: {
1246                 struct lfsck_reply *reply;
1247
1248                 if (rc != 0) {
1249                         spin_lock(&ltds->ltd_lock);
1250                         list_del_init(&ltd->ltd_layout_phase_list);
1251                         list_del_init(&ltd->ltd_layout_list);
1252                         spin_unlock(&ltds->ltd_lock);
1253                         break;
1254                 }
1255
1256                 reply = req_capsule_server_get(&req->rq_pill,
1257                                                &RMF_LFSCK_REPLY);
1258                 if (reply == NULL) {
1259                         rc = -EPROTO;
1260                         CDEBUG(D_LFSCK, "%s:  invalid query reply: rc = %d\n",
1261                                lfsck_lfsck2name(com->lc_lfsck), rc);
1262                         spin_lock(&ltds->ltd_lock);
1263                         list_del_init(&ltd->ltd_layout_phase_list);
1264                         list_del_init(&ltd->ltd_layout_list);
1265                         spin_unlock(&ltds->ltd_lock);
1266                         break;
1267                 }
1268
1269                 switch (reply->lr_status) {
1270                 case LS_SCANNING_PHASE1:
1271                         break;
1272                 case LS_SCANNING_PHASE2:
1273                         spin_lock(&ltds->ltd_lock);
1274                         list_del_init(&ltd->ltd_layout_phase_list);
1275                         if (ltd->ltd_dead || ltd->ltd_layout_done) {
1276                                 spin_unlock(&ltds->ltd_lock);
1277                                 break;
1278                         }
1279
1280                         if (lr->lr_flags & LEF_TO_OST)
1281                                 list_add_tail(&ltd->ltd_layout_phase_list,
1282                                               &llmd->llmd_ost_phase2_list);
1283                         else
1284                                 list_add_tail(&ltd->ltd_layout_phase_list,
1285                                               &llmd->llmd_mdt_phase2_list);
1286                         spin_unlock(&ltds->ltd_lock);
1287                         break;
1288                 default:
1289                         spin_lock(&ltds->ltd_lock);
1290                         list_del_init(&ltd->ltd_layout_phase_list);
1291                         list_del_init(&ltd->ltd_layout_list);
1292                         spin_unlock(&ltds->ltd_lock);
1293                         break;
1294                 }
1295                 break;
1296         }
1297         default:
1298                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1299                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1300                 break;
1301         }
1302
1303         if (!laia->laia_shared) {
1304                 lfsck_tgt_put(ltd);
1305                 lfsck_component_put(env, com);
1306         }
1307
1308         return 0;
1309 }
1310
1311 static int lfsck_layout_master_query_others(const struct lu_env *env,
1312                                             struct lfsck_component *com)
1313 {
1314         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1315         struct lfsck_request              *lr    = &info->lti_lr;
1316         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1317         struct lfsck_instance             *lfsck = com->lc_lfsck;
1318         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1319         struct ptlrpc_request_set         *set;
1320         struct lfsck_tgt_descs            *ltds;
1321         struct lfsck_tgt_desc             *ltd;
1322         struct list_head                  *head;
1323         int                                rc    = 0;
1324         int                                rc1   = 0;
1325         ENTRY;
1326
1327         set = ptlrpc_prep_set();
1328         if (set == NULL)
1329                 RETURN(-ENOMEM);
1330
1331         llmd->llmd_touch_gen++;
1332         memset(lr, 0, sizeof(*lr));
1333         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1334         lr->lr_event = LE_QUERY;
1335         lr->lr_active = LFSCK_TYPE_LAYOUT;
1336         laia->laia_com = com;
1337         laia->laia_lr = lr;
1338         laia->laia_shared = 0;
1339
1340         if (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1341                 ltds = &lfsck->li_mdt_descs;
1342                 lr->lr_flags = 0;
1343                 head = &llmd->llmd_mdt_phase1_list;
1344         } else {
1345
1346 again:
1347                 ltds = &lfsck->li_ost_descs;
1348                 lr->lr_flags = LEF_TO_OST;
1349                 head = &llmd->llmd_ost_phase1_list;
1350         }
1351
1352         laia->laia_ltds = ltds;
1353         spin_lock(&ltds->ltd_lock);
1354         while (!list_empty(head)) {
1355                 ltd = list_entry(head->next,
1356                                  struct lfsck_tgt_desc,
1357                                  ltd_layout_phase_list);
1358                 if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1359                         break;
1360
1361                 ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1362                 list_del(&ltd->ltd_layout_phase_list);
1363                 list_add_tail(&ltd->ltd_layout_phase_list, head);
1364                 atomic_inc(&ltd->ltd_ref);
1365                 laia->laia_ltd = ltd;
1366                 spin_unlock(&ltds->ltd_lock);
1367                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1368                                          lfsck_layout_master_async_interpret,
1369                                          laia, LFSCK_QUERY);
1370                 if (rc != 0) {
1371                         CDEBUG(D_LFSCK, "%s: layout LFSCK fail to query %s %x: "
1372                                "rc = %d\n", lfsck_lfsck2name(lfsck),
1373                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1374                                ltd->ltd_index, rc);
1375                         lfsck_tgt_put(ltd);
1376                         rc1 = rc;
1377                 }
1378                 spin_lock(&ltds->ltd_lock);
1379         }
1380         spin_unlock(&ltds->ltd_lock);
1381
1382         rc = ptlrpc_set_wait(set);
1383         if (rc < 0) {
1384                 ptlrpc_set_destroy(set);
1385                 RETURN(rc);
1386         }
1387
1388         if (!(lr->lr_flags & LEF_TO_OST) &&
1389             list_empty(&llmd->llmd_mdt_phase1_list))
1390                 goto again;
1391
1392         ptlrpc_set_destroy(set);
1393
1394         RETURN(rc1 != 0 ? rc1 : rc);
1395 }
1396
1397 static inline bool
1398 lfsck_layout_master_to_orphan(struct lfsck_layout_master_data *llmd)
1399 {
1400         return list_empty(&llmd->llmd_mdt_phase1_list) &&
1401                (!list_empty(&llmd->llmd_ost_phase2_list) ||
1402                 list_empty(&llmd->llmd_ost_phase1_list));
1403 }
1404
1405 static int lfsck_layout_master_notify_others(const struct lu_env *env,
1406                                              struct lfsck_component *com,
1407                                              struct lfsck_request *lr)
1408 {
1409         struct lfsck_thread_info          *info  = lfsck_env_info(env);
1410         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1411         struct lfsck_instance             *lfsck = com->lc_lfsck;
1412         struct lfsck_layout_master_data   *llmd  = com->lc_data;
1413         struct lfsck_layout               *lo    = com->lc_file_ram;
1414         struct lfsck_bookmark             *bk    = &lfsck->li_bookmark_ram;
1415         struct ptlrpc_request_set         *set;
1416         struct lfsck_tgt_descs            *ltds;
1417         struct lfsck_tgt_desc             *ltd;
1418         struct lfsck_tgt_desc             *next;
1419         struct list_head                  *head;
1420         __u32                              idx;
1421         int                                rc    = 0;
1422         ENTRY;
1423
1424         set = ptlrpc_prep_set();
1425         if (set == NULL)
1426                 RETURN(-ENOMEM);
1427
1428         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1429         lr->lr_active = LFSCK_TYPE_LAYOUT;
1430         laia->laia_com = com;
1431         laia->laia_lr = lr;
1432         laia->laia_shared = 0;
1433         switch (lr->lr_event) {
1434         case LE_START:
1435                 /* Notify OSTs firstly, then handle other MDTs if needed. */
1436                 ltds = &lfsck->li_ost_descs;
1437                 laia->laia_ltds = ltds;
1438                 down_read(&ltds->ltd_rw_sem);
1439                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1440                         ltd = lfsck_tgt_get(ltds, idx);
1441                         LASSERT(ltd != NULL);
1442
1443                         laia->laia_ltd = ltd;
1444                         ltd->ltd_layout_done = 0;
1445                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1446                                         lfsck_layout_master_async_interpret,
1447                                         laia, LFSCK_NOTIFY);
1448                         if (rc != 0) {
1449                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1450                                        "notify %s %x for start: rc = %d\n",
1451                                        lfsck_lfsck2name(lfsck),
1452                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1453                                        "MDT", idx, rc);
1454                                 lfsck_tgt_put(ltd);
1455                                 lo->ll_flags |= LF_INCOMPLETE;
1456                         }
1457                 }
1458                 up_read(&ltds->ltd_rw_sem);
1459
1460                 /* Sync up */
1461                 rc = ptlrpc_set_wait(set);
1462                 if (rc < 0) {
1463                         ptlrpc_set_destroy(set);
1464                         RETURN(rc);
1465                 }
1466
1467                 if (!(bk->lb_param & LPF_ALL_TGT))
1468                         break;
1469
1470                 /* link other MDT targets locallly. */
1471                 ltds = &lfsck->li_mdt_descs;
1472                 spin_lock(&ltds->ltd_lock);
1473                 cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1474                         ltd = LTD_TGT(ltds, idx);
1475                         LASSERT(ltd != NULL);
1476
1477                         if (!list_empty(&ltd->ltd_layout_list))
1478                                 continue;
1479
1480                         list_add_tail(&ltd->ltd_layout_list,
1481                                       &llmd->llmd_mdt_list);
1482                         list_add_tail(&ltd->ltd_layout_phase_list,
1483                                       &llmd->llmd_mdt_phase1_list);
1484                 }
1485                 spin_unlock(&ltds->ltd_lock);
1486                 break;
1487         case LE_STOP:
1488         case LE_PHASE2_DONE:
1489         case LE_PEER_EXIT: {
1490                 /* Handle other MDTs firstly if needed, then notify the OSTs. */
1491                 if (bk->lb_param & LPF_ALL_TGT) {
1492                         head = &llmd->llmd_mdt_list;
1493                         ltds = &lfsck->li_mdt_descs;
1494                         if (lr->lr_event == LE_STOP) {
1495                                 /* unlink other MDT targets locallly. */
1496                                 spin_lock(&ltds->ltd_lock);
1497                                 list_for_each_entry_safe(ltd, next, head,
1498                                                          ltd_layout_list) {
1499                                         list_del_init(&ltd->ltd_layout_phase_list);
1500                                         list_del_init(&ltd->ltd_layout_list);
1501                                 }
1502                                 spin_unlock(&ltds->ltd_lock);
1503
1504                                 lr->lr_flags |= LEF_TO_OST;
1505                                 head = &llmd->llmd_ost_list;
1506                                 ltds = &lfsck->li_ost_descs;
1507                         } else {
1508                                 lr->lr_flags &= ~LEF_TO_OST;
1509                         }
1510                 } else {
1511                         lr->lr_flags |= LEF_TO_OST;
1512                         head = &llmd->llmd_ost_list;
1513                         ltds = &lfsck->li_ost_descs;
1514                 }
1515
1516 again:
1517                 laia->laia_ltds = ltds;
1518                 spin_lock(&ltds->ltd_lock);
1519                 while (!list_empty(head)) {
1520                         ltd = list_entry(head->next, struct lfsck_tgt_desc,
1521                                          ltd_layout_list);
1522                         if (!list_empty(&ltd->ltd_layout_phase_list))
1523                                 list_del_init(&ltd->ltd_layout_phase_list);
1524                         list_del_init(&ltd->ltd_layout_list);
1525                         atomic_inc(&ltd->ltd_ref);
1526                         laia->laia_ltd = ltd;
1527                         spin_unlock(&ltds->ltd_lock);
1528                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1529                                         lfsck_layout_master_async_interpret,
1530                                         laia, LFSCK_NOTIFY);
1531                         if (rc != 0) {
1532                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1533                                        "notify %s %x for stop/phase2_done/"
1534                                        "peer_exit: rc = %d\n",
1535                                        lfsck_lfsck2name(lfsck),
1536                                        (lr->lr_flags & LEF_TO_OST) ? "OST" :
1537                                        "MDT", ltd->ltd_index, rc);
1538                                 lfsck_tgt_put(ltd);
1539                         }
1540                         spin_lock(&ltds->ltd_lock);
1541                 }
1542                 spin_unlock(&ltds->ltd_lock);
1543
1544                 rc = ptlrpc_set_wait(set);
1545                 if (rc < 0) {
1546                         ptlrpc_set_destroy(set);
1547                         RETURN(rc);
1548                 }
1549
1550                 if (!(lr->lr_flags & LEF_TO_OST)) {
1551                         lr->lr_flags |= LEF_TO_OST;
1552                         head = &llmd->llmd_ost_list;
1553                         ltds = &lfsck->li_ost_descs;
1554                         goto again;
1555                 }
1556                 break;
1557         }
1558         case LE_PHASE1_DONE:
1559                 llmd->llmd_touch_gen++;
1560                 ltds = &lfsck->li_mdt_descs;
1561                 laia->laia_ltds = ltds;
1562                 spin_lock(&ltds->ltd_lock);
1563                 while (!list_empty(&llmd->llmd_mdt_phase1_list)) {
1564                         ltd = list_entry(llmd->llmd_mdt_phase1_list.next,
1565                                          struct lfsck_tgt_desc,
1566                                          ltd_layout_phase_list);
1567                         if (ltd->ltd_layout_gen == llmd->llmd_touch_gen)
1568                                 break;
1569
1570                         ltd->ltd_layout_gen = llmd->llmd_touch_gen;
1571                         list_del_init(&ltd->ltd_layout_phase_list);
1572                         list_add_tail(&ltd->ltd_layout_phase_list,
1573                                       &llmd->llmd_mdt_phase1_list);
1574                         atomic_inc(&ltd->ltd_ref);
1575                         laia->laia_ltd = ltd;
1576                         spin_unlock(&ltds->ltd_lock);
1577                         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1578                                         lfsck_layout_master_async_interpret,
1579                                         laia, LFSCK_NOTIFY);
1580                         if (rc != 0) {
1581                                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to "
1582                                        "notify MDT %x for phase1_done: "
1583                                        "rc = %d\n", lfsck_lfsck2name(lfsck),
1584                                        ltd->ltd_index, rc);
1585                                 lfsck_tgt_put(ltd);
1586                         }
1587                         spin_lock(&ltds->ltd_lock);
1588                 }
1589                 spin_unlock(&ltds->ltd_lock);
1590                 break;
1591         default:
1592                 CDEBUG(D_LFSCK, "%s: layout LFSCK unexpected event: rc = %d\n",
1593                        lfsck_lfsck2name(lfsck), lr->lr_event);
1594                 rc = -EINVAL;
1595                 break;
1596         }
1597
1598         rc = ptlrpc_set_wait(set);
1599         ptlrpc_set_destroy(set);
1600
1601         RETURN(rc);
1602 }
1603
1604 static int lfsck_layout_double_scan_result(const struct lu_env *env,
1605                                            struct lfsck_component *com,
1606                                            int rc)
1607 {
1608         struct lfsck_instance   *lfsck = com->lc_lfsck;
1609         struct lfsck_layout     *lo    = com->lc_file_ram;
1610         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1611
1612         down_write(&com->lc_sem);
1613         lo->ll_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1614                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1615         lo->ll_time_last_checkpoint = cfs_time_current_sec();
1616         lo->ll_objs_checked_phase2 += com->lc_new_checked;
1617
1618         if (rc > 0) {
1619                 com->lc_journal = 0;
1620                 if (lo->ll_flags & LF_INCOMPLETE)
1621                         lo->ll_status = LS_PARTIAL;
1622                 else
1623                         lo->ll_status = LS_COMPLETED;
1624                 if (!(bk->lb_param & LPF_DRYRUN))
1625                         lo->ll_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1626                 lo->ll_time_last_complete = lo->ll_time_last_checkpoint;
1627                 lo->ll_success_count++;
1628         } else if (rc == 0) {
1629                 lo->ll_status = lfsck->li_status;
1630                 if (lo->ll_status == 0)
1631                         lo->ll_status = LS_STOPPED;
1632         } else {
1633                 lo->ll_status = LS_FAILED;
1634         }
1635
1636         rc = lfsck_layout_store(env, com);
1637         up_write(&com->lc_sem);
1638
1639         return rc;
1640 }
1641
1642 static int lfsck_layout_lock(const struct lu_env *env,
1643                              struct lfsck_component *com,
1644                              struct dt_object *obj,
1645                              struct lustre_handle *lh, __u64 bits)
1646 {
1647         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1648         ldlm_policy_data_t              *policy = &info->lti_policy;
1649         struct ldlm_res_id              *resid  = &info->lti_resid;
1650         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1651         __u64                            flags  = LDLM_FL_ATOMIC_CB;
1652         int                              rc;
1653
1654         LASSERT(lfsck->li_namespace != NULL);
1655
1656         memset(policy, 0, sizeof(*policy));
1657         policy->l_inodebits.bits = bits;
1658         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
1659         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
1660                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
1661                                     ldlm_completion_ast, NULL, NULL, 0,
1662                                     LVB_T_NONE, NULL, lh);
1663         if (rc == ELDLM_OK) {
1664                 rc = 0;
1665         } else {
1666                 memset(lh, 0, sizeof(*lh));
1667                 rc = -EIO;
1668         }
1669
1670         return rc;
1671 }
1672
1673 static void lfsck_layout_unlock(struct lustre_handle *lh)
1674 {
1675         if (lustre_handle_is_used(lh)) {
1676                 ldlm_lock_decref(lh, LCK_EX);
1677                 memset(lh, 0, sizeof(*lh));
1678         }
1679 }
1680
1681 static int lfsck_layout_trans_stop(const struct lu_env *env,
1682                                    struct dt_device *dev,
1683                                    struct thandle *handle, int result)
1684 {
1685         int rc;
1686
1687         handle->th_result = result;
1688         rc = dt_trans_stop(env, dev, handle);
1689         if (rc > 0)
1690                 rc = 0;
1691         else if (rc == 0)
1692                 rc = 1;
1693
1694         return rc;
1695 }
1696
1697 /**
1698  * Get the system default stripe size.
1699  *
1700  * \param[in] env       pointer to the thread context
1701  * \param[in] lfsck     pointer to the lfsck instance
1702  * \param[out] size     pointer to the default stripe size
1703  *
1704  * \retval              0 for success
1705  * \retval              negative error number on failure
1706  */
1707 static int lfsck_layout_get_def_stripesize(const struct lu_env *env,
1708                                            struct lfsck_instance *lfsck,
1709                                            __u32 *size)
1710 {
1711         struct lov_user_md      *lum = &lfsck_env_info(env)->lti_lum;
1712         struct dt_object        *root;
1713         int                      rc;
1714
1715         root = dt_locate(env, lfsck->li_next, &lfsck->li_local_root_fid);
1716         if (IS_ERR(root))
1717                 return PTR_ERR(root);
1718
1719         /* Get the default stripe size via xattr_get on the backend root. */
1720         rc = dt_xattr_get(env, root, lfsck_buf_get(env, lum, sizeof(*lum)),
1721                           XATTR_NAME_LOV, BYPASS_CAPA);
1722         if (rc > 0) {
1723                 /* The lum->lmm_stripe_size is LE mode. The *size also
1724                  * should be LE mode. So it is unnecessary to convert. */
1725                 *size = lum->lmm_stripe_size;
1726                 rc = 0;
1727         } else if (unlikely(rc == 0)) {
1728                 rc = -EINVAL;
1729         }
1730
1731         lfsck_object_put(env, root);
1732
1733         return rc;
1734 }
1735
1736 /**
1737  * \retval       +1: repaired
1738  * \retval        0: did nothing
1739  * \retval      -ve: on error
1740  */
1741 static int lfsck_layout_refill_lovea(const struct lu_env *env,
1742                                      struct thandle *handle,
1743                                      struct dt_object *parent,
1744                                      struct lu_fid *cfid,
1745                                      struct lu_buf *buf,
1746                                      struct lov_ost_data_v1 *slot,
1747                                      int fl, __u32 ost_idx)
1748 {
1749         struct ost_id           *oi     = &lfsck_env_info(env)->lti_oi;
1750         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1751         int                      rc;
1752
1753         fid_to_ostid(cfid, oi);
1754         ostid_cpu_to_le(oi, &slot->l_ost_oi);
1755         slot->l_ost_gen = cpu_to_le32(0);
1756         slot->l_ost_idx = cpu_to_le32(ost_idx);
1757
1758         if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_HOLE) {
1759                 struct lov_ost_data_v1 *objs;
1760                 int                     i;
1761                 __u16                   count;
1762
1763                 count = le16_to_cpu(lmm->lmm_stripe_count);
1764                 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
1765                         objs = &lmm->lmm_objects[0];
1766                 else
1767                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1768                 for (i = 0; i < count; i++, objs++) {
1769                         if (objs != slot && lovea_slot_is_dummy(objs))
1770                                 break;
1771                 }
1772
1773                 /* If the @slot is the last dummy slot to be refilled,
1774                  * then drop LOV_PATTERN_F_HOLE from lmm::lmm_pattern. */
1775                 if (i == count)
1776                         lmm->lmm_pattern &= ~cpu_to_le32(LOV_PATTERN_F_HOLE);
1777         }
1778
1779         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV, fl, handle,
1780                           BYPASS_CAPA);
1781         if (rc == 0)
1782                 rc = 1;
1783
1784         return rc;
1785 }
1786
1787 /**
1788  * \retval       +1: repaired
1789  * \retval        0: did nothing
1790  * \retval      -ve: on error
1791  */
1792 static int lfsck_layout_extend_lovea(const struct lu_env *env,
1793                                      struct lfsck_instance *lfsck,
1794                                      struct thandle *handle,
1795                                      struct dt_object *parent,
1796                                      struct lu_fid *cfid,
1797                                      struct lu_buf *buf, int fl,
1798                                      __u32 ost_idx, __u32 ea_off, bool reset)
1799 {
1800         struct lov_mds_md_v1    *lmm    = buf->lb_buf;
1801         struct lov_ost_data_v1  *objs;
1802         int                      rc;
1803         __u16                    count;
1804         bool                     hole   = false;
1805         ENTRY;
1806
1807         if (fl == LU_XATTR_CREATE || reset) {
1808                 __u32 pattern = LOV_PATTERN_RAID0;
1809
1810                 count = ea_off + 1;
1811                 LASSERT(buf->lb_len == lov_mds_md_size(count, LOV_MAGIC_V1));
1812
1813                 if (ea_off != 0 || reset) {
1814                         pattern |= LOV_PATTERN_F_HOLE;
1815                         hole = true;
1816                 }
1817
1818                 memset(lmm, 0, buf->lb_len);
1819                 lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1);
1820                 lmm->lmm_pattern = cpu_to_le32(pattern);
1821                 fid_to_lmm_oi(lfsck_dto2fid(parent), &lmm->lmm_oi);
1822                 lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi);
1823
1824                 rc = lfsck_layout_get_def_stripesize(env, lfsck,
1825                                                      &lmm->lmm_stripe_size);
1826                 if (rc != 0)
1827                         RETURN(rc);
1828
1829                 objs = &lmm->lmm_objects[ea_off];
1830         } else {
1831                 __u32   magic = le32_to_cpu(lmm->lmm_magic);
1832                 int     gap;
1833
1834                 count = le16_to_cpu(lmm->lmm_stripe_count);
1835                 if (magic == LOV_MAGIC_V1)
1836                         objs = &lmm->lmm_objects[count];
1837                 else
1838                         objs = &((struct lov_mds_md_v3 *)lmm)->
1839                                                         lmm_objects[count];
1840
1841                 gap = ea_off - count;
1842                 if (gap >= 0)
1843                         count = ea_off + 1;
1844                 LASSERT(buf->lb_len == lov_mds_md_size(count, magic));
1845
1846                 if (gap > 0) {
1847                         memset(objs, 0, gap * sizeof(*objs));
1848                         lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_HOLE);
1849                         hole = true;
1850                 }
1851
1852                 lmm->lmm_layout_gen =
1853                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
1854                 objs += gap;
1855         }
1856
1857         lmm->lmm_stripe_count = cpu_to_le16(count);
1858         rc = lfsck_layout_refill_lovea(env, handle, parent, cfid, buf, objs,
1859                                        fl, ost_idx);
1860
1861         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant extend layout EA for "
1862                DFID": parent "DFID", OST-index %u, stripe-index %u, fl %d, "
1863                "reset %s, %s LOV EA hole: rc = %d\n",
1864                lfsck_lfsck2name(lfsck), PFID(cfid), PFID(lfsck_dto2fid(parent)),
1865                ost_idx, ea_off, fl, reset ? "yes" : "no",
1866                hole ? "with" : "without", rc);
1867
1868         RETURN(rc);
1869 }
1870
1871 /**
1872  * \retval       +1: repaired
1873  * \retval        0: did nothing
1874  * \retval      -ve: on error
1875  */
1876 static int lfsck_layout_update_pfid(const struct lu_env *env,
1877                                     struct lfsck_component *com,
1878                                     struct dt_object *parent,
1879                                     struct lu_fid *cfid,
1880                                     struct dt_device *cdev, __u32 ea_off)
1881 {
1882         struct filter_fid       *pfid   = &lfsck_env_info(env)->lti_new_pfid;
1883         struct dt_object        *child;
1884         struct thandle          *handle;
1885         const struct lu_fid     *tfid   = lu_object_fid(&parent->do_lu);
1886         struct lu_buf           *buf;
1887         int                      rc     = 0;
1888         ENTRY;
1889
1890         child = lfsck_object_find_by_dev(env, cdev, cfid);
1891         if (IS_ERR(child))
1892                 RETURN(PTR_ERR(child));
1893
1894         handle = dt_trans_create(env, cdev);
1895         if (IS_ERR(handle))
1896                 GOTO(out, rc = PTR_ERR(handle));
1897
1898         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
1899         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
1900         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
1901          * MDT-object's FID::f_ver, instead it is the OST-object index in its
1902          * parent MDT-object's layout EA. */
1903         pfid->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
1904         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
1905
1906         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
1907         if (rc != 0)
1908                 GOTO(stop, rc);
1909
1910         rc = dt_trans_start(env, cdev, handle);
1911         if (rc != 0)
1912                 GOTO(stop, rc);
1913
1914         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
1915                           BYPASS_CAPA);
1916
1917         GOTO(stop, rc = (rc == 0 ? 1 : rc));
1918
1919 stop:
1920         dt_trans_stop(env, cdev, handle);
1921
1922 out:
1923         lu_object_put(env, &child->do_lu);
1924
1925         return rc;
1926 }
1927
1928 /**
1929  * This function will create the MDT-object with the given (partial) LOV EA.
1930  *
1931  * Under some data corruption cases, the MDT-object of the file may be lost,
1932  * but its OST-objects, or some of them are there. The layout LFSCK needs to
1933  * re-create the MDT-object with the orphan OST-object(s) information.
1934  *
1935  * On the other hand, the LFSCK may has created some OST-object for repairing
1936  * dangling LOV EA reference, but as the LFSCK processing, it may find that
1937  * the old OST-object is there and should replace the former new created OST
1938  * object. Unfortunately, some others have modified such newly created object.
1939  * To keep the data (both new and old), the LFSCK will create MDT-object with
1940  * new FID to reference the original OST-object.
1941  *
1942  * \param[in] env       pointer to the thread context
1943  * \param[in] com       pointer to the lfsck component
1944  * \param[in] ltd       pointer to target device descriptor
1945  * \param[in] rec       pointer to the record for the orphan OST-object
1946  * \param[in] cfid      pointer to FID for the orphan OST-object
1947  * \param[in] infix     additional information, such as the FID for original
1948  *                      MDT-object and the stripe offset in the LOV EA
1949  * \param[in] type      the type for describing why the orphan MDT-object is
1950  *                      created. The rules are as following:
1951  *
1952  *  type "C":           Multiple OST-objects claim the same MDT-object and the
1953  *                      same slot in the layout EA. Then the LFSCK will create
1954  *                      new MDT-object(s) to hold the conflict OST-object(s).
1955  *
1956  *  type "N":           The orphan OST-object does not know which one was the
1957  *                      real parent MDT-object, so the LFSCK uses new FID for
1958  *                      its parent MDT-object.
1959  *
1960  *  type "R":           The orphan OST-object knows its parent MDT-object FID,
1961  *                      but does not know the position (the file name) in the
1962  *                      namespace.
1963  *
1964  * The orphan name will be like:
1965  * ${FID}-${infix}-${type}-${conflict_version}
1966  *
1967  * \param[in] ea_off    the stripe offset in the LOV EA
1968  *
1969  * \retval              positive on repaired something
1970  * \retval              0 if needs to repair nothing
1971  * \retval              negative error number on failure
1972  */
1973 static int lfsck_layout_recreate_parent(const struct lu_env *env,
1974                                         struct lfsck_component *com,
1975                                         struct lfsck_tgt_desc *ltd,
1976                                         struct lu_orphan_rec *rec,
1977                                         struct lu_fid *cfid,
1978                                         const char *infix,
1979                                         const char *type,
1980                                         __u32 ea_off)
1981 {
1982         struct lfsck_thread_info        *info   = lfsck_env_info(env);
1983         char                            *name   = info->lti_key;
1984         struct lu_attr                  *la     = &info->lti_la;
1985         struct dt_object_format         *dof    = &info->lti_dof;
1986         struct lfsck_instance           *lfsck  = com->lc_lfsck;
1987         struct lu_fid                   *pfid   = &rec->lor_fid;
1988         struct lu_fid                   *tfid   = &info->lti_fid3;
1989         struct dt_device                *next   = lfsck->li_next;
1990         struct dt_object                *pobj   = NULL;
1991         struct dt_object                *cobj   = NULL;
1992         struct thandle                  *th     = NULL;
1993         struct lu_buf                   *pbuf   = NULL;
1994         struct lu_buf                   *ea_buf = &info->lti_big_buf;
1995         struct lustre_handle             lh     = { 0 };
1996         int                              buflen = ea_buf->lb_len;
1997         int                              idx    = 0;
1998         int                              rc     = 0;
1999         ENTRY;
2000
2001         /* Create .lustre/lost+found/MDTxxxx when needed. */
2002         if (unlikely(lfsck->li_lpf_obj == NULL)) {
2003                 rc = lfsck_create_lpf(env, lfsck);
2004                 if (rc != 0)
2005                         GOTO(log, rc);
2006         }
2007
2008         if (fid_is_zero(pfid)) {
2009                 struct filter_fid *ff = &info->lti_new_pfid;
2010
2011                 rc = lfsck_fid_alloc(env, lfsck, pfid, false);
2012                 if (rc != 0)
2013                         RETURN(rc);
2014
2015                 ff->ff_parent.f_seq = cpu_to_le64(pfid->f_seq);
2016                 ff->ff_parent.f_oid = cpu_to_le32(pfid->f_oid);
2017                 /* Currently, the filter_fid::ff_parent::f_ver is not the
2018                  * real parent MDT-object's FID::f_ver, instead it is the
2019                  * OST-object index in its parent MDT-object's layout EA. */
2020                 ff->ff_parent.f_stripe_idx = cpu_to_le32(ea_off);
2021                 pbuf = lfsck_buf_get(env, ff, sizeof(struct filter_fid));
2022                 cobj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, cfid);
2023                 if (IS_ERR(cobj))
2024                         GOTO(log, rc = PTR_ERR(cobj));
2025         }
2026
2027         pobj = lfsck_object_find_by_dev(env, lfsck->li_bottom, pfid);
2028         if (IS_ERR(pobj))
2029                 GOTO(put, rc = PTR_ERR(pobj));
2030
2031         LASSERT(infix != NULL);
2032         LASSERT(type != NULL);
2033
2034         do {
2035                 snprintf(name, NAME_MAX, DFID"%s-%s-%d", PFID(pfid), infix,
2036                          type, idx++);
2037                 rc = dt_lookup(env, lfsck->li_lpf_obj, (struct dt_rec *)tfid,
2038                                (const struct dt_key *)name, BYPASS_CAPA);
2039                 if (rc != 0 && rc != -ENOENT)
2040                         GOTO(put, rc);
2041         } while (rc == 0);
2042
2043         memset(la, 0, sizeof(*la));
2044         la->la_uid = rec->lor_uid;
2045         la->la_gid = rec->lor_gid;
2046         la->la_mode = S_IFREG | S_IRUSR;
2047         la->la_valid = LA_MODE | LA_UID | LA_GID;
2048
2049         memset(dof, 0, sizeof(*dof));
2050         dof->dof_type = dt_mode_to_dft(S_IFREG);
2051
2052         rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2053         if (buflen < rc) {
2054                 lu_buf_realloc(ea_buf, rc);
2055                 buflen = ea_buf->lb_len;
2056                 if (ea_buf->lb_buf == NULL)
2057                         GOTO(put, rc = -ENOMEM);
2058         } else {
2059                 ea_buf->lb_len = rc;
2060         }
2061
2062         /* Hold update lock on the .lustre/lost+found/MDTxxxx/.
2063          *
2064          * XXX: Currently, we do not grab the PDO lock as normal create cases,
2065          *      because creating MDT-object for orphan OST-object is rare, we
2066          *      do not much care about the performance. It can be improved in
2067          *      the future when needed. */
2068         rc = lfsck_layout_lock(env, com, lfsck->li_lpf_obj, &lh,
2069                                MDS_INODELOCK_UPDATE);
2070         if (rc != 0)
2071                 GOTO(put, rc);
2072
2073         th = dt_trans_create(env, next);
2074         if (IS_ERR(th))
2075                 GOTO(unlock, rc = PTR_ERR(th));
2076
2077         /* 1a. Update OST-object's parent information remotely.
2078          *
2079          * If other subsequent modifications failed, then next LFSCK scanning
2080          * will process the OST-object as orphan again with known parent FID. */
2081         if (cobj != NULL) {
2082                 rc = dt_declare_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th);
2083                 if (rc != 0)
2084                         GOTO(stop, rc);
2085         }
2086
2087         /* 2a. Create the MDT-object locally. */
2088         rc = dt_declare_create(env, pobj, la, NULL, dof, th);
2089         if (rc != 0)
2090                 GOTO(stop, rc);
2091
2092         /* 3a. Add layout EA for the MDT-object. */
2093         rc = dt_declare_xattr_set(env, pobj, ea_buf, XATTR_NAME_LOV,
2094                                   LU_XATTR_CREATE, th);
2095         if (rc != 0)
2096                 GOTO(stop, rc);
2097
2098         /* 4a. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2099         rc = dt_declare_insert(env, lfsck->li_lpf_obj,
2100                                (const struct dt_rec *)pfid,
2101                                (const struct dt_key *)name, th);
2102         if (rc != 0)
2103                 GOTO(stop, rc);
2104
2105         rc = dt_trans_start(env, next, th);
2106         if (rc != 0)
2107                 GOTO(stop, rc);
2108
2109         /* 1b. Update OST-object's parent information remotely. */
2110         if (cobj != NULL) {
2111                 rc = dt_xattr_set(env, cobj, pbuf, XATTR_NAME_FID, 0, th,
2112                                   BYPASS_CAPA);
2113                 if (rc != 0)
2114                         GOTO(stop, rc);
2115         }
2116
2117         dt_write_lock(env, pobj, 0);
2118         /* 2b. Create the MDT-object locally. */
2119         rc = dt_create(env, pobj, la, NULL, dof, th);
2120         if (rc == 0)
2121                 /* 3b. Add layout EA for the MDT-object. */
2122                 rc = lfsck_layout_extend_lovea(env, lfsck, th, pobj, cfid,
2123                                                ea_buf, LU_XATTR_CREATE,
2124                                                ltd->ltd_index, ea_off, false);
2125         dt_write_unlock(env, pobj);
2126         if (rc < 0)
2127                 GOTO(stop, rc);
2128
2129         /* 4b. Insert the MDT-object to .lustre/lost+found/MDTxxxx/ */
2130         rc = dt_insert(env, lfsck->li_lpf_obj,
2131                        (const struct dt_rec *)pfid,
2132                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2133
2134         GOTO(stop, rc);
2135
2136 stop:
2137         dt_trans_stop(env, next, th);
2138
2139 unlock:
2140         lfsck_layout_unlock(&lh);
2141
2142 put:
2143         if (cobj != NULL && !IS_ERR(cobj))
2144                 lu_object_put(env, &cobj->do_lu);
2145         if (pobj != NULL && !IS_ERR(pobj))
2146                 lu_object_put(env, &pobj->do_lu);
2147         ea_buf->lb_len = buflen;
2148
2149 log:
2150         if (rc < 0)
2151                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant failed to "
2152                        "recreate the lost MDT-object: parent "DFID
2153                        ", child "DFID", OST-index %u, stripe-index %u, "
2154                        "infix %s, type %s: rc = %d\n",
2155                        lfsck_lfsck2name(lfsck), PFID(pfid), PFID(cfid),
2156                        ltd->ltd_index, ea_off, infix, type, rc);
2157
2158         return rc >= 0 ? 1 : rc;
2159 }
2160
2161 static int lfsck_layout_master_conditional_destroy(const struct lu_env *env,
2162                                                    struct lfsck_component *com,
2163                                                    const struct lu_fid *fid,
2164                                                    __u32 index)
2165 {
2166         struct lfsck_thread_info *info  = lfsck_env_info(env);
2167         struct lfsck_request     *lr    = &info->lti_lr;
2168         struct lfsck_instance    *lfsck = com->lc_lfsck;
2169         struct lfsck_tgt_desc    *ltd;
2170         struct ptlrpc_request    *req;
2171         struct lfsck_request     *tmp;
2172         struct obd_export        *exp;
2173         int                       rc    = 0;
2174         ENTRY;
2175
2176         ltd = lfsck_tgt_get(&lfsck->li_ost_descs, index);
2177         if (unlikely(ltd == NULL))
2178                 RETURN(-ENXIO);
2179
2180         exp = ltd->ltd_exp;
2181         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
2182                 GOTO(put, rc = -EOPNOTSUPP);
2183
2184         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
2185         if (req == NULL)
2186                 GOTO(put, rc = -ENOMEM);
2187
2188         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
2189         if (rc != 0) {
2190                 ptlrpc_request_free(req);
2191
2192                 GOTO(put, rc);
2193         }
2194
2195         memset(lr, 0, sizeof(*lr));
2196         lr->lr_event = LE_CONDITIONAL_DESTROY;
2197         lr->lr_active = LFSCK_TYPE_LAYOUT;
2198         lr->lr_fid = *fid;
2199
2200         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2201         *tmp = *lr;
2202         ptlrpc_request_set_replen(req);
2203
2204         rc = ptlrpc_queue_wait(req);
2205         ptlrpc_req_finished(req);
2206
2207         GOTO(put, rc);
2208
2209 put:
2210         lfsck_tgt_put(ltd);
2211
2212         return rc;
2213 }
2214
2215 static int lfsck_layout_slave_conditional_destroy(const struct lu_env *env,
2216                                                   struct lfsck_component *com,
2217                                                   struct lfsck_request *lr)
2218 {
2219         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2220         struct lu_attr                  *la     = &info->lti_la;
2221         ldlm_policy_data_t              *policy = &info->lti_policy;
2222         struct ldlm_res_id              *resid  = &info->lti_resid;
2223         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2224         struct dt_device                *dev    = lfsck->li_bottom;
2225         struct lu_fid                   *fid    = &lr->lr_fid;
2226         struct dt_object                *obj;
2227         struct thandle                  *th     = NULL;
2228         struct lustre_handle             lh     = { 0 };
2229         __u64                            flags  = 0;
2230         int                              rc     = 0;
2231         ENTRY;
2232
2233         obj = lfsck_object_find_by_dev(env, dev, fid);
2234         if (IS_ERR(obj))
2235                 RETURN(PTR_ERR(obj));
2236
2237         dt_read_lock(env, obj, 0);
2238         if (dt_object_exists(obj) == 0) {
2239                 dt_read_unlock(env, obj);
2240
2241                 GOTO(put, rc = -ENOENT);
2242         }
2243
2244         /* Get obj's attr without lock firstly. */
2245         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2246         dt_read_unlock(env, obj);
2247         if (rc != 0)
2248                 GOTO(put, rc);
2249
2250         if (likely(la->la_ctime != 0 || la->la_mode & S_ISUID))
2251                 GOTO(put, rc = -ETXTBSY);
2252
2253         /* Acquire extent lock on [0, EOF] to sync with all possible written. */
2254         LASSERT(lfsck->li_namespace != NULL);
2255
2256         memset(policy, 0, sizeof(*policy));
2257         policy->l_extent.end = OBD_OBJECT_EOF;
2258         ost_fid_build_resid(fid, resid);
2259         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_EXTENT,
2260                                     policy, LCK_EX, &flags, ldlm_blocking_ast,
2261                                     ldlm_completion_ast, NULL, NULL, 0,
2262                                     LVB_T_NONE, NULL, &lh);
2263         if (rc != ELDLM_OK)
2264                 GOTO(put, rc = -EIO);
2265
2266         dt_write_lock(env, obj, 0);
2267         /* Get obj's attr within lock again. */
2268         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2269         if (rc != 0)
2270                 GOTO(unlock, rc);
2271
2272         if (la->la_ctime != 0)
2273                 GOTO(unlock, rc = -ETXTBSY);
2274
2275         th = dt_trans_create(env, dev);
2276         if (IS_ERR(th))
2277                 GOTO(unlock, rc = PTR_ERR(th));
2278
2279         rc = dt_declare_ref_del(env, obj, th);
2280         if (rc != 0)
2281                 GOTO(stop, rc);
2282
2283         rc = dt_declare_destroy(env, obj, th);
2284         if (rc != 0)
2285                 GOTO(stop, rc);
2286
2287         rc = dt_trans_start_local(env, dev, th);
2288         if (rc != 0)
2289                 GOTO(stop, rc);
2290
2291         rc = dt_ref_del(env, obj, th);
2292         if (rc != 0)
2293                 GOTO(stop, rc);
2294
2295         rc = dt_destroy(env, obj, th);
2296         if (rc == 0)
2297                 CDEBUG(D_LFSCK, "%s: layout LFSCK destroyed the empty "
2298                        "OST-object "DFID" that was created for reparing "
2299                        "dangling referenced case. But the original missed "
2300                        "OST-object is found now.\n",
2301                        lfsck_lfsck2name(lfsck), PFID(fid));
2302
2303         GOTO(stop, rc);
2304
2305 stop:
2306         dt_trans_stop(env, dev, th);
2307
2308 unlock:
2309         dt_write_unlock(env, obj);
2310         ldlm_lock_decref(&lh, LCK_EX);
2311
2312 put:
2313         lu_object_put(env, &obj->do_lu);
2314
2315         return rc;
2316 }
2317
2318 /**
2319  * Some OST-object has occupied the specified layout EA slot.
2320  * Such OST-object may be generated by the LFSCK when repair
2321  * dangling referenced MDT-object, which can be indicated by
2322  * attr::la_ctime == 0 but without S_ISUID in la_mode. If it
2323  * is true and such OST-object has not been modified yet, we
2324  * will replace it with the orphan OST-object; otherwise the
2325  * LFSCK will create new MDT-object to reference the orphan.
2326  *
2327  * \retval       +1: repaired
2328  * \retval        0: did nothing
2329  * \retval      -ve: on error
2330  */
2331 static int lfsck_layout_conflict_create(const struct lu_env *env,
2332                                         struct lfsck_component *com,
2333                                         struct lfsck_tgt_desc *ltd,
2334                                         struct lu_orphan_rec *rec,
2335                                         struct dt_object *parent,
2336                                         struct lu_fid *cfid,
2337                                         struct lu_buf *ea_buf,
2338                                         struct lov_ost_data_v1 *slot,
2339                                         __u32 ea_off, __u32 ori_len)
2340 {
2341         struct lfsck_thread_info *info          = lfsck_env_info(env);
2342         struct lu_fid            *cfid2         = &info->lti_fid2;
2343         struct ost_id            *oi            = &info->lti_oi;
2344         char                     *infix         = info->lti_tmpbuf;
2345         struct lov_mds_md_v1     *lmm           = ea_buf->lb_buf;
2346         struct dt_device         *dev           = com->lc_lfsck->li_bottom;
2347         struct thandle           *th            = NULL;
2348         struct lustre_handle      lh            = { 0 };
2349         __u32                     ost_idx2      = le32_to_cpu(slot->l_ost_idx);
2350         int                       rc            = 0;
2351         ENTRY;
2352
2353         ostid_le_to_cpu(&slot->l_ost_oi, oi);
2354         ostid_to_fid(cfid2, oi, ost_idx2);
2355
2356         /* Hold layout lock on the parent to prevent others to access. */
2357         rc = lfsck_layout_lock(env, com, parent, &lh,
2358                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2359         if (rc != 0)
2360                 GOTO(out, rc);
2361
2362         rc = lfsck_layout_master_conditional_destroy(env, com, cfid2, ost_idx2);
2363
2364         /* If the conflict OST-obejct is not created for fixing dangling
2365          * referenced MDT-object in former LFSCK check/repair, or it has
2366          * been modified by others, then we cannot destroy it. Re-create
2367          * a new MDT-object for the orphan OST-object. */
2368         if (rc == -ETXTBSY) {
2369                 /* No need the layout lock on the original parent. */
2370                 lfsck_layout_unlock(&lh);
2371                 ea_buf->lb_len = ori_len;
2372
2373                 fid_zero(&rec->lor_fid);
2374                 snprintf(infix, LFSCK_TMPBUF_LEN, "-"DFID"-%x",
2375                          PFID(lu_object_fid(&parent->do_lu)), ea_off);
2376                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2377                                                   infix, "C", ea_off);
2378
2379                 RETURN(rc);
2380         }
2381
2382         if (rc != 0 && rc != -ENOENT)
2383                 GOTO(unlock, rc);
2384
2385         th = dt_trans_create(env, dev);
2386         if (IS_ERR(th))
2387                 GOTO(unlock, rc = PTR_ERR(th));
2388
2389         rc = dt_declare_xattr_set(env, parent, ea_buf, XATTR_NAME_LOV,
2390                                   LU_XATTR_REPLACE, th);
2391         if (rc != 0)
2392                 GOTO(stop, rc);
2393
2394         rc = dt_trans_start_local(env, dev, th);
2395         if (rc != 0)
2396                 GOTO(stop, rc);
2397
2398         dt_write_lock(env, parent, 0);
2399         lmm->lmm_layout_gen = cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2400         rc = lfsck_layout_refill_lovea(env, th, parent, cfid, ea_buf, slot,
2401                                        LU_XATTR_REPLACE, ltd->ltd_index);
2402         dt_write_unlock(env, parent);
2403
2404         GOTO(stop, rc);
2405
2406 stop:
2407         dt_trans_stop(env, dev, th);
2408
2409 unlock:
2410         lfsck_layout_unlock(&lh);
2411
2412 out:
2413         ea_buf->lb_len = ori_len;
2414
2415         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant replaced the conflict "
2416                "OST-object "DFID" on the OST %x with the orphan "DFID" on "
2417                "the OST %x: parent "DFID", stripe-index %u: rc = %d\n",
2418                lfsck_lfsck2name(com->lc_lfsck), PFID(cfid2), ost_idx2,
2419                PFID(cfid), ltd->ltd_index, PFID(lfsck_dto2fid(parent)),
2420                ea_off, rc);
2421
2422         return rc >= 0 ? 1 : rc;
2423 }
2424
2425 /**
2426  * \retval       +1: repaired
2427  * \retval        0: did nothing
2428  * \retval      -ve: on error
2429  */
2430 static int lfsck_layout_recreate_lovea(const struct lu_env *env,
2431                                        struct lfsck_component *com,
2432                                        struct lfsck_tgt_desc *ltd,
2433                                        struct lu_orphan_rec *rec,
2434                                        struct dt_object *parent,
2435                                        struct lu_fid *cfid,
2436                                        __u32 ost_idx, __u32 ea_off)
2437 {
2438         struct lfsck_thread_info *info          = lfsck_env_info(env);
2439         struct lu_buf            *buf           = &info->lti_big_buf;
2440         struct lu_fid            *fid           = &info->lti_fid2;
2441         struct ost_id            *oi            = &info->lti_oi;
2442         struct lfsck_instance    *lfsck         = com->lc_lfsck;
2443         struct dt_device         *dt            = lfsck->li_bottom;
2444         struct lfsck_bookmark    *bk            = &lfsck->li_bookmark_ram;
2445         struct thandle            *handle       = NULL;
2446         size_t                    buflen        = buf->lb_len;
2447         struct lov_mds_md_v1     *lmm;
2448         struct lov_ost_data_v1   *objs;
2449         struct lustre_handle      lh            = { 0 };
2450         __u32                     magic;
2451         int                       fl            = 0;
2452         int                       rc            = 0;
2453         int                       rc1;
2454         int                       i;
2455         __u16                     count;
2456         bool                      locked        = false;
2457         ENTRY;
2458
2459         rc = lfsck_layout_lock(env, com, parent, &lh,
2460                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2461         if (rc != 0) {
2462                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to recreate "
2463                        "LOV EA for "DFID": parent "DFID", OST-index %u, "
2464                        "stripe-index %u: rc = %d\n",
2465                        lfsck_lfsck2name(lfsck), PFID(cfid),
2466                        PFID(lfsck_dto2fid(parent)), ost_idx, ea_off, rc);
2467
2468                 RETURN(rc);
2469         }
2470
2471 again:
2472         if (locked) {
2473                 dt_write_unlock(env, parent);
2474                 locked = false;
2475         }
2476
2477         if (handle != NULL) {
2478                 dt_trans_stop(env, dt, handle);
2479                 handle = NULL;
2480         }
2481
2482         if (rc < 0)
2483                 GOTO(unlock_layout, rc);
2484
2485         if (buf->lb_len < rc) {
2486                 lu_buf_realloc(buf, rc);
2487                 buflen = buf->lb_len;
2488                 if (buf->lb_buf == NULL)
2489                         GOTO(unlock_layout, rc = -ENOMEM);
2490         }
2491
2492         if (!(bk->lb_param & LPF_DRYRUN)) {
2493                 handle = dt_trans_create(env, dt);
2494                 if (IS_ERR(handle))
2495                         GOTO(unlock_layout, rc = PTR_ERR(handle));
2496
2497                 rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
2498                                           fl, handle);
2499                 if (rc != 0)
2500                         GOTO(stop, rc);
2501
2502                 rc = dt_trans_start_local(env, dt, handle);
2503                 if (rc != 0)
2504                         GOTO(stop, rc);
2505         }
2506
2507         dt_write_lock(env, parent, 0);
2508         locked = true;
2509         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
2510         if (rc == -ERANGE) {
2511                 rc = dt_xattr_get(env, parent, &LU_BUF_NULL, XATTR_NAME_LOV,
2512                                   BYPASS_CAPA);
2513                 LASSERT(rc != 0);
2514                 goto again;
2515         } else if (rc == -ENODATA || rc == 0) {
2516                 rc = lov_mds_md_size(ea_off + 1, LOV_MAGIC_V1);
2517                 /* If the declared is not big enough, re-try. */
2518                 if (buf->lb_len < rc)
2519                         goto again;
2520
2521                 fl = LU_XATTR_CREATE;
2522         } else if (rc < 0) {
2523                 GOTO(unlock_parent, rc);
2524         } else if (unlikely(buf->lb_len == 0)) {
2525                 goto again;
2526         } else {
2527                 fl = LU_XATTR_REPLACE;
2528         }
2529
2530         if (fl == LU_XATTR_CREATE) {
2531                 if (bk->lb_param & LPF_DRYRUN)
2532                         GOTO(unlock_parent, rc = 1);
2533
2534                 LASSERT(buf->lb_len >= rc);
2535
2536                 buf->lb_len = rc;
2537                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2538                                                buf, fl, ost_idx, ea_off, false);
2539
2540                 GOTO(unlock_parent, rc);
2541         }
2542
2543         lmm = buf->lb_buf;
2544         rc1 = lfsck_layout_verify_header(lmm);
2545
2546         /* If the LOV EA crashed, the rebuild it. */
2547         if (rc1 == -EINVAL) {
2548                 if (bk->lb_param & LPF_DRYRUN)
2549                         GOTO(unlock_parent, rc = 1);
2550
2551                 LASSERT(buf->lb_len >= rc);
2552
2553                 buf->lb_len = rc;
2554                 memset(lmm, 0, buf->lb_len);
2555                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2556                                                buf, fl, ost_idx, ea_off, true);
2557
2558                 GOTO(unlock_parent, rc);
2559         }
2560
2561         /* For other unknown magic/pattern, keep the current LOV EA. */
2562         if (rc1 != 0)
2563                 GOTO(unlock_parent, rc = rc1);
2564
2565         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
2566          * been verified in lfsck_layout_verify_header() already. If some
2567          * new magic introduced in the future, then layout LFSCK needs to
2568          * be updated also. */
2569         magic = le32_to_cpu(lmm->lmm_magic);
2570         if (magic == LOV_MAGIC_V1) {
2571                 objs = &lmm->lmm_objects[0];
2572         } else {
2573                 LASSERT(magic == LOV_MAGIC_V3);
2574                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
2575         }
2576
2577         count = le16_to_cpu(lmm->lmm_stripe_count);
2578         if (count == 0)
2579                 GOTO(unlock_parent, rc = -EINVAL);
2580         LASSERT(count > 0);
2581
2582         /* Exceed the current end of MDT-object layout EA. Then extend it. */
2583         if (count <= ea_off) {
2584                 if (bk->lb_param & LPF_DRYRUN)
2585                         GOTO(unlock_parent, rc = 1);
2586
2587                 rc = lov_mds_md_size(ea_off + 1, magic);
2588                 /* If the declared is not big enough, re-try. */
2589                 if (buf->lb_len < rc)
2590                         goto again;
2591
2592                 buf->lb_len = rc;
2593                 rc = lfsck_layout_extend_lovea(env, lfsck, handle, parent, cfid,
2594                                                buf, fl, ost_idx, ea_off, false);
2595
2596                 GOTO(unlock_parent, rc);
2597         }
2598
2599         LASSERTF(rc > 0, "invalid rc = %d\n", rc);
2600
2601         buf->lb_len = rc;
2602         for (i = 0; i < count; i++, objs++) {
2603                 /* The MDT-object was created via lfsck_layout_recover_create()
2604                  * by others before, and we fill the dummy layout EA. */
2605                 if (lovea_slot_is_dummy(objs)) {
2606                         if (i != ea_off)
2607                                 continue;
2608
2609                         if (bk->lb_param & LPF_DRYRUN)
2610                                 GOTO(unlock_parent, rc = 1);
2611
2612                         lmm->lmm_layout_gen =
2613                             cpu_to_le16(le16_to_cpu(lmm->lmm_layout_gen) + 1);
2614                         rc = lfsck_layout_refill_lovea(env, handle, parent,
2615                                                        cfid, buf, objs, fl,
2616                                                        ost_idx);
2617
2618                         CDEBUG(D_LFSCK, "%s layout LFSCK assistant fill "
2619                                "dummy layout slot for "DFID": parent "DFID
2620                                ", OST-index %u, stripe-index %u: rc = %d\n",
2621                                lfsck_lfsck2name(lfsck), PFID(cfid),
2622                                PFID(lfsck_dto2fid(parent)), ost_idx, i, rc);
2623
2624                         GOTO(unlock_parent, rc);
2625                 }
2626
2627                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
2628                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
2629                 /* It should be rare case, the slot is there, but the LFSCK
2630                  * does not handle it during the first-phase cycle scanning. */
2631                 if (unlikely(lu_fid_eq(fid, cfid))) {
2632                         if (i == ea_off) {
2633                                 GOTO(unlock_parent, rc = 0);
2634                         } else {
2635                                 /* Rare case that the OST-object index
2636                                  * does not match the parent MDT-object
2637                                  * layout EA. We trust the later one. */
2638                                 if (bk->lb_param & LPF_DRYRUN)
2639                                         GOTO(unlock_parent, rc = 1);
2640
2641                                 dt_write_unlock(env, parent);
2642                                 if (handle != NULL)
2643                                         dt_trans_stop(env, dt, handle);
2644                                 lfsck_layout_unlock(&lh);
2645                                 buf->lb_len = buflen;
2646                                 rc = lfsck_layout_update_pfid(env, com, parent,
2647                                                         cfid, ltd->ltd_tgt, i);
2648
2649                                 CDEBUG(D_LFSCK, "%s layout LFSCK assistant "
2650                                        "updated OST-object's pfid for "DFID
2651                                        ": parent "DFID", OST-index %u, "
2652                                        "stripe-index %u: rc = %d\n",
2653                                        lfsck_lfsck2name(lfsck), PFID(cfid),
2654                                        PFID(lfsck_dto2fid(parent)),
2655                                        ltd->ltd_index, i, rc);
2656
2657                                 RETURN(rc);
2658                         }
2659                 }
2660         }
2661
2662         /* The MDT-object exists, but related layout EA slot is occupied
2663          * by others. */
2664         if (bk->lb_param & LPF_DRYRUN)
2665                 GOTO(unlock_parent, rc = 1);
2666
2667         dt_write_unlock(env, parent);
2668         if (handle != NULL)
2669                 dt_trans_stop(env, dt, handle);
2670         lfsck_layout_unlock(&lh);
2671         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1)
2672                 objs = &lmm->lmm_objects[ea_off];
2673         else
2674                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[ea_off];
2675         rc = lfsck_layout_conflict_create(env, com, ltd, rec, parent, cfid,
2676                                           buf, objs, ea_off, buflen);
2677
2678         RETURN(rc);
2679
2680 unlock_parent:
2681         if (locked)
2682                 dt_write_unlock(env, parent);
2683
2684 stop:
2685         if (handle != NULL)
2686                 dt_trans_stop(env, dt, handle);
2687
2688 unlock_layout:
2689         lfsck_layout_unlock(&lh);
2690         buf->lb_len = buflen;
2691
2692         return rc;
2693 }
2694
2695 static int lfsck_layout_scan_orphan_one(const struct lu_env *env,
2696                                         struct lfsck_component *com,
2697                                         struct lfsck_tgt_desc *ltd,
2698                                         struct lu_orphan_rec *rec,
2699                                         struct lu_fid *cfid)
2700 {
2701         struct lfsck_layout     *lo     = com->lc_file_ram;
2702         struct lu_fid           *pfid   = &rec->lor_fid;
2703         struct dt_object        *parent = NULL;
2704         __u32                    ea_off = pfid->f_stripe_idx;
2705         int                      rc     = 0;
2706         ENTRY;
2707
2708         if (!fid_is_sane(cfid))
2709                 GOTO(out, rc = -EINVAL);
2710
2711         if (fid_is_zero(pfid)) {
2712                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2713                                                   "", "N", ea_off);
2714                 GOTO(out, rc);
2715         }
2716
2717         pfid->f_ver = 0;
2718         if (!fid_is_sane(pfid))
2719                 GOTO(out, rc = -EINVAL);
2720
2721         parent = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
2722         if (IS_ERR(parent))
2723                 GOTO(out, rc = PTR_ERR(parent));
2724
2725         if (unlikely(dt_object_remote(parent) != 0))
2726                 GOTO(put, rc = -EXDEV);
2727
2728         if (dt_object_exists(parent) == 0) {
2729                 lu_object_put(env, &parent->do_lu);
2730                 rc = lfsck_layout_recreate_parent(env, com, ltd, rec, cfid,
2731                                                   "", "R", ea_off);
2732                 GOTO(out, rc);
2733         }
2734
2735         if (!S_ISREG(lu_object_attr(&parent->do_lu)))
2736                 GOTO(put, rc = -EISDIR);
2737
2738         rc = lfsck_layout_recreate_lovea(env, com, ltd, rec, parent, cfid,
2739                                          ltd->ltd_index, ea_off);
2740
2741         GOTO(put, rc);
2742
2743 put:
2744         if (rc <= 0)
2745                 lu_object_put(env, &parent->do_lu);
2746         else
2747                 /* The layout EA is changed, need to be reloaded next time. */
2748                 lu_object_put_nocache(env, &parent->do_lu);
2749
2750 out:
2751         down_write(&com->lc_sem);
2752         com->lc_new_scanned++;
2753         com->lc_new_checked++;
2754         if (rc > 0) {
2755                 lo->ll_objs_repaired[LLIT_ORPHAN - 1]++;
2756                 rc = 0;
2757         } else if (rc < 0) {
2758                 lo->ll_objs_failed_phase2++;
2759         }
2760         up_write(&com->lc_sem);
2761
2762         return rc;
2763 }
2764
2765 static int lfsck_layout_scan_orphan(const struct lu_env *env,
2766                                     struct lfsck_component *com,
2767                                     struct lfsck_tgt_desc *ltd)
2768 {
2769         struct lfsck_layout             *lo     = com->lc_file_ram;
2770         struct lfsck_instance           *lfsck  = com->lc_lfsck;
2771         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
2772         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2773         struct ost_id                   *oi     = &info->lti_oi;
2774         struct lu_fid                   *fid    = &info->lti_fid;
2775         struct dt_object                *obj;
2776         const struct dt_it_ops          *iops;
2777         struct dt_it                    *di;
2778         int                              rc     = 0;
2779         ENTRY;
2780
2781         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant starts the orphan "
2782                "scanning for OST%04x\n",
2783                lfsck_lfsck2name(lfsck), ltd->ltd_index);
2784
2785         ostid_set_seq(oi, FID_SEQ_IDIF);
2786         ostid_set_id(oi, 0);
2787         ostid_to_fid(fid, oi, ltd->ltd_index);
2788         obj = lfsck_object_find_by_dev(env, ltd->ltd_tgt, fid);
2789         if (unlikely(IS_ERR(obj)))
2790                 GOTO(log, rc = PTR_ERR(obj));
2791
2792         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_orphan_features);
2793         if (rc != 0)
2794                 GOTO(put, rc);
2795
2796         iops = &obj->do_index_ops->dio_it;
2797         di = iops->init(env, obj, 0, BYPASS_CAPA);
2798         if (IS_ERR(di))
2799                 GOTO(put, rc = PTR_ERR(di));
2800
2801         rc = iops->load(env, di, 0);
2802         if (rc == -ESRCH) {
2803                 /* -ESRCH means that the orphan OST-objects rbtree has been
2804                  * cleanup because of the OSS server restart or other errors. */
2805                 lo->ll_flags |= LF_INCOMPLETE;
2806                 GOTO(fini, rc);
2807         }
2808
2809         if (rc == 0)
2810                 rc = iops->next(env, di);
2811         else if (rc > 0)
2812                 rc = 0;
2813
2814         if (rc < 0)
2815                 GOTO(fini, rc);
2816
2817         if (rc > 0)
2818                 GOTO(fini, rc = 0);
2819
2820         do {
2821                 struct dt_key           *key;
2822                 struct lu_orphan_rec    *rec = &info->lti_rec;
2823
2824                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2825                     cfs_fail_val > 0) {
2826                         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2827                         struct l_wait_info       lwi;
2828
2829                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2830                                           NULL, NULL);
2831                         l_wait_event(thread->t_ctl_waitq,
2832                                      !thread_is_running(thread),
2833                                      &lwi);
2834                 }
2835
2836                 key = iops->key(env, di);
2837                 com->lc_fid_latest_scanned_phase2 = *(struct lu_fid *)key;
2838                 rc = iops->rec(env, di, (struct dt_rec *)rec, 0);
2839                 if (rc == 0)
2840                         rc = lfsck_layout_scan_orphan_one(env, com, ltd, rec,
2841                                         &com->lc_fid_latest_scanned_phase2);
2842                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
2843                         GOTO(fini, rc);
2844
2845                 lfsck_control_speed_by_self(com);
2846                 do {
2847                         rc = iops->next(env, di);
2848                 } while (rc < 0 && !(bk->lb_param & LPF_FAILOUT));
2849         } while (rc == 0);
2850
2851         GOTO(fini, rc);
2852
2853 fini:
2854         iops->put(env, di);
2855         iops->fini(env, di);
2856 put:
2857         lu_object_put(env, &obj->do_lu);
2858
2859 log:
2860         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant finished the orphan "
2861                "scanning for OST%04x: rc = %d\n",
2862                lfsck_lfsck2name(lfsck), ltd->ltd_index, rc);
2863
2864         return rc > 0 ? 0 : rc;
2865 }
2866
2867 /* For the MDT-object with dangling reference, we need to repare the
2868  * inconsistency according to the LFSCK sponsor's requirement:
2869  *
2870  * 1) Keep the inconsistency there and report the inconsistency case,
2871  *    then give the chance to the application to find related issues,
2872  *    and the users can make the decision about how to handle it with
2873  *    more human knownledge. (by default)
2874  *
2875  * 2) Re-create the missed OST-object with the FID/owner information. */
2876 static int lfsck_layout_repair_dangling(const struct lu_env *env,
2877                                         struct lfsck_component *com,
2878                                         struct lfsck_layout_req *llr,
2879                                         const struct lu_attr *pla)
2880 {
2881         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2882         struct filter_fid               *pfid   = &info->lti_new_pfid;
2883         struct dt_allocation_hint       *hint   = &info->lti_hint;
2884         struct lu_attr                  *cla    = &info->lti_la2;
2885         struct dt_object                *parent = llr->llr_parent->llo_obj;
2886         struct dt_object                *child  = llr->llr_child;
2887         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2888         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2889         struct thandle                  *handle;
2890         struct lu_buf                   *buf;
2891         struct lustre_handle             lh     = { 0 };
2892         int                              rc;
2893         bool                             create;
2894         ENTRY;
2895
2896         if (com->lc_lfsck->li_bookmark_ram.lb_param & LPF_CREATE_OSTOBJ)
2897                 create = true;
2898         else
2899                 create = false;
2900
2901         if (!create)
2902                 GOTO(log, rc = 1);
2903
2904         memset(cla, 0, sizeof(*cla));
2905         cla->la_uid = pla->la_uid;
2906         cla->la_gid = pla->la_gid;
2907         cla->la_mode = S_IFREG | 0666;
2908         cla->la_valid = LA_TYPE | LA_MODE | LA_UID | LA_GID |
2909                         LA_ATIME | LA_MTIME | LA_CTIME;
2910
2911         rc = lfsck_layout_lock(env, com, parent, &lh,
2912                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
2913         if (rc != 0)
2914                 GOTO(log, rc);
2915
2916         handle = dt_trans_create(env, dev);
2917         if (IS_ERR(handle))
2918                 GOTO(unlock1, rc = PTR_ERR(handle));
2919
2920         hint->dah_parent = NULL;
2921         hint->dah_mode = 0;
2922         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
2923         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
2924         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
2925          * MDT-object's FID::f_ver, instead it is the OST-object index in its
2926          * parent MDT-object's layout EA. */
2927         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
2928         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
2929
2930         rc = dt_declare_create(env, child, cla, hint, NULL, handle);
2931         if (rc != 0)
2932                 GOTO(stop, rc);
2933
2934         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID,
2935                                   LU_XATTR_CREATE, handle);
2936         if (rc != 0)
2937                 GOTO(stop, rc);
2938
2939         rc = dt_trans_start(env, dev, handle);
2940         if (rc != 0)
2941                 GOTO(stop, rc);
2942
2943         dt_read_lock(env, parent, 0);
2944         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
2945                 GOTO(unlock2, rc = 1);
2946
2947         rc = dt_create(env, child, cla, hint, NULL, handle);
2948         if (rc != 0)
2949                 GOTO(unlock2, rc);
2950
2951         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, LU_XATTR_CREATE,
2952                           handle, BYPASS_CAPA);
2953
2954         GOTO(unlock2, rc);
2955
2956 unlock2:
2957         dt_read_unlock(env, parent);
2958
2959 stop:
2960         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
2961
2962 unlock1:
2963         lfsck_layout_unlock(&lh);
2964
2965 log:
2966         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant found dangling "
2967                "reference for: parent "DFID", child "DFID", OST-index %u, "
2968                "stripe-index %u, owner %u/%u. %s: rc = %d\n",
2969                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
2970                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx,
2971                llr->llr_lov_idx, pla->la_uid, pla->la_gid,
2972                create ? "Create the lost OST-object as required" :
2973                         "Keep the MDT-object there by default", rc);
2974
2975         return rc;
2976 }
2977
2978 /* If the OST-object does not recognize the MDT-object as its parent, and
2979  * there is no other MDT-object claims as its parent, then just trust the
2980  * given MDT-object as its parent. So update the OST-object filter_fid. */
2981 static int lfsck_layout_repair_unmatched_pair(const struct lu_env *env,
2982                                               struct lfsck_component *com,
2983                                               struct lfsck_layout_req *llr,
2984                                               const struct lu_attr *pla)
2985 {
2986         struct lfsck_thread_info        *info   = lfsck_env_info(env);
2987         struct filter_fid               *pfid   = &info->lti_new_pfid;
2988         struct lu_attr                  *tla    = &info->lti_la3;
2989         struct dt_object                *parent = llr->llr_parent->llo_obj;
2990         struct dt_object                *child  = llr->llr_child;
2991         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
2992         const struct lu_fid             *tfid   = lu_object_fid(&parent->do_lu);
2993         struct thandle                  *handle;
2994         struct lu_buf                   *buf;
2995         struct lustre_handle             lh     = { 0 };
2996         int                              rc;
2997         ENTRY;
2998
2999         rc = lfsck_layout_lock(env, com, parent, &lh,
3000                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3001         if (rc != 0)
3002                 GOTO(log, rc);
3003
3004         handle = dt_trans_create(env, dev);
3005         if (IS_ERR(handle))
3006                 GOTO(unlock1, rc = PTR_ERR(handle));
3007
3008         pfid->ff_parent.f_seq = cpu_to_le64(tfid->f_seq);
3009         pfid->ff_parent.f_oid = cpu_to_le32(tfid->f_oid);
3010         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
3011          * MDT-object's FID::f_ver, instead it is the OST-object index in its
3012          * parent MDT-object's layout EA. */
3013         pfid->ff_parent.f_stripe_idx = cpu_to_le32(llr->llr_lov_idx);
3014         buf = lfsck_buf_get(env, pfid, sizeof(struct filter_fid));
3015
3016         rc = dt_declare_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle);
3017         if (rc != 0)
3018                 GOTO(stop, rc);
3019
3020         tla->la_valid = LA_UID | LA_GID;
3021         tla->la_uid = pla->la_uid;
3022         tla->la_gid = pla->la_gid;
3023         rc = dt_declare_attr_set(env, child, tla, handle);
3024         if (rc != 0)
3025                 GOTO(stop, rc);
3026
3027         rc = dt_trans_start(env, dev, handle);
3028         if (rc != 0)
3029                 GOTO(stop, rc);
3030
3031         dt_write_lock(env, parent, 0);
3032         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3033                 GOTO(unlock2, rc = 1);
3034
3035         rc = dt_xattr_set(env, child, buf, XATTR_NAME_FID, 0, handle,
3036                           BYPASS_CAPA);
3037         if (rc != 0)
3038                 GOTO(unlock2, rc);
3039
3040         /* Get the latest parent's owner. */
3041         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3042         if (rc != 0)
3043                 GOTO(unlock2, rc);
3044
3045         tla->la_valid = LA_UID | LA_GID;
3046         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3047
3048         GOTO(unlock2, rc);
3049
3050 unlock2:
3051         dt_write_unlock(env, parent);
3052
3053 stop:
3054         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3055
3056 unlock1:
3057         lfsck_layout_unlock(&lh);
3058
3059 log:
3060         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired unmatched "
3061                "MDT-OST pair for: parent "DFID", child "DFID", OST-index %u, "
3062                "stripe-index %u, owner %u/%u: rc = %d\n",
3063                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3064                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3065                pla->la_uid, pla->la_gid, rc);
3066
3067         return rc;
3068 }
3069
3070 /* If there are more than one MDT-objects claim as the OST-object's parent,
3071  * and the OST-object only recognizes one of them, then we need to generate
3072  * new OST-object(s) with new fid(s) for the non-recognized MDT-object(s). */
3073 static int lfsck_layout_repair_multiple_references(const struct lu_env *env,
3074                                                    struct lfsck_component *com,
3075                                                    struct lfsck_layout_req *llr,
3076                                                    struct lu_attr *la,
3077                                                    struct lu_buf *buf)
3078 {
3079         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3080         struct dt_allocation_hint       *hint   = &info->lti_hint;
3081         struct dt_object_format         *dof    = &info->lti_dof;
3082         struct dt_device                *pdev   = com->lc_lfsck->li_next;
3083         struct ost_id                   *oi     = &info->lti_oi;
3084         struct dt_object                *parent = llr->llr_parent->llo_obj;
3085         struct dt_device                *cdev   = lfsck_obj2dt_dev(llr->llr_child);
3086         struct dt_object                *child  = NULL;
3087         struct lu_device                *d      = &cdev->dd_lu_dev;
3088         struct lu_object                *o      = NULL;
3089         struct thandle                  *handle;
3090         struct lov_mds_md_v1            *lmm;
3091         struct lov_ost_data_v1          *objs;
3092         struct lustre_handle             lh     = { 0 };
3093         __u32                            magic;
3094         int                              rc;
3095         ENTRY;
3096
3097         rc = lfsck_layout_lock(env, com, parent, &lh,
3098                                MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
3099         if (rc != 0)
3100                 GOTO(log, rc);
3101
3102         handle = dt_trans_create(env, pdev);
3103         if (IS_ERR(handle))
3104                 GOTO(unlock1, rc = PTR_ERR(handle));
3105
3106         o = lu_object_anon(env, d, NULL);
3107         if (IS_ERR(o))
3108                 GOTO(stop, rc = PTR_ERR(o));
3109
3110         child = container_of(o, struct dt_object, do_lu);
3111         o = lu_object_locate(o->lo_header, d->ld_type);
3112         if (unlikely(o == NULL))
3113                 GOTO(stop, rc = -EINVAL);
3114
3115         child = container_of(o, struct dt_object, do_lu);
3116         la->la_valid = LA_UID | LA_GID;
3117         hint->dah_parent = NULL;
3118         hint->dah_mode = 0;
3119         dof->dof_type = DFT_REGULAR;
3120         rc = dt_declare_create(env, child, la, NULL, NULL, handle);
3121         if (rc != 0)
3122                 GOTO(stop, rc);
3123
3124         rc = dt_declare_xattr_set(env, parent, buf, XATTR_NAME_LOV,
3125                                   LU_XATTR_REPLACE, handle);
3126         if (rc != 0)
3127                 GOTO(stop, rc);
3128
3129         rc = dt_trans_start(env, pdev, handle);
3130         if (rc != 0)
3131                 GOTO(stop, rc);
3132
3133         dt_write_lock(env, parent, 0);
3134         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3135                 GOTO(unlock2, rc = 0);
3136
3137         rc = dt_xattr_get(env, parent, buf, XATTR_NAME_LOV, BYPASS_CAPA);
3138         if (unlikely(rc == 0 || rc == -ENODATA || rc == -ERANGE))
3139                 GOTO(unlock2, rc = 0);
3140
3141         lmm = buf->lb_buf;
3142         /* Someone change layout during the LFSCK, no need to repair then. */
3143         if (le16_to_cpu(lmm->lmm_layout_gen) != llr->llr_parent->llo_gen)
3144                 GOTO(unlock2, rc = 0);
3145
3146         rc = dt_create(env, child, la, hint, dof, handle);
3147         if (rc != 0)
3148                 GOTO(unlock2, rc);
3149
3150         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
3151          * been verified in lfsck_layout_verify_header() already. If some
3152          * new magic introduced in the future, then layout LFSCK needs to
3153          * be updated also. */
3154         magic = le32_to_cpu(lmm->lmm_magic);
3155         if (magic == LOV_MAGIC_V1) {
3156                 objs = &lmm->lmm_objects[0];
3157         } else {
3158                 LASSERT(magic == LOV_MAGIC_V3);
3159                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3160         }
3161
3162         lmm->lmm_layout_gen = cpu_to_le16(llr->llr_parent->llo_gen + 1);
3163         fid_to_ostid(lu_object_fid(&child->do_lu), oi);
3164         ostid_cpu_to_le(oi, &objs[llr->llr_lov_idx].l_ost_oi);
3165         objs[llr->llr_lov_idx].l_ost_gen = cpu_to_le32(0);
3166         objs[llr->llr_lov_idx].l_ost_idx = cpu_to_le32(llr->llr_ost_idx);
3167         rc = dt_xattr_set(env, parent, buf, XATTR_NAME_LOV,
3168                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
3169
3170         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
3171
3172 unlock2:
3173         dt_write_unlock(env, parent);
3174
3175 stop:
3176         if (child != NULL)
3177                 lu_object_put(env, &child->do_lu);
3178
3179         dt_trans_stop(env, pdev, handle);
3180
3181 unlock1:
3182         lfsck_layout_unlock(&lh);
3183
3184 log:
3185         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired multiple "
3186                "references for: parent "DFID", OST-index %u, stripe-index %u, "
3187                "owner %u/%u: rc = %d\n",
3188                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3189                llr->llr_ost_idx, llr->llr_lov_idx, la->la_uid, la->la_gid, rc);
3190
3191         return rc;
3192 }
3193
3194 /* If the MDT-object and the OST-object have different owner information,
3195  * then trust the MDT-object, because the normal chown/chgrp handle order
3196  * is from MDT to OST, and it is possible that some chown/chgrp operation
3197  * is partly done. */
3198 static int lfsck_layout_repair_owner(const struct lu_env *env,
3199                                      struct lfsck_component *com,
3200                                      struct lfsck_layout_req *llr,
3201                                      struct lu_attr *pla)
3202 {
3203         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3204         struct lu_attr                  *tla    = &info->lti_la3;
3205         struct dt_object                *parent = llr->llr_parent->llo_obj;
3206         struct dt_object                *child  = llr->llr_child;
3207         struct dt_device                *dev    = lfsck_obj2dt_dev(child);
3208         struct thandle                  *handle;
3209         int                              rc;
3210         ENTRY;
3211
3212         handle = dt_trans_create(env, dev);
3213         if (IS_ERR(handle))
3214                 GOTO(log, rc = PTR_ERR(handle));
3215
3216         tla->la_uid = pla->la_uid;
3217         tla->la_gid = pla->la_gid;
3218         tla->la_valid = LA_UID | LA_GID;
3219         rc = dt_declare_attr_set(env, child, tla, handle);
3220         if (rc != 0)
3221                 GOTO(stop, rc);
3222
3223         rc = dt_trans_start(env, dev, handle);
3224         if (rc != 0)
3225                 GOTO(stop, rc);
3226
3227         /* Use the dt_object lock to serialize with destroy and attr_set. */
3228         dt_read_lock(env, parent, 0);
3229         if (unlikely(lu_object_is_dying(parent->do_lu.lo_header)))
3230                 GOTO(unlock, rc = 1);
3231
3232         /* Get the latest parent's owner. */
3233         rc = dt_attr_get(env, parent, tla, BYPASS_CAPA);
3234         if (rc != 0)
3235                 GOTO(unlock, rc);
3236
3237         /* Some others chown/chgrp during the LFSCK, needs to do nothing. */
3238         if (unlikely(tla->la_uid != pla->la_uid ||
3239                      tla->la_gid != pla->la_gid))
3240                 GOTO(unlock, rc = 1);
3241
3242         tla->la_valid = LA_UID | LA_GID;
3243         rc = dt_attr_set(env, child, tla, handle, BYPASS_CAPA);
3244
3245         GOTO(unlock, rc);
3246
3247 unlock:
3248         dt_read_unlock(env, parent);
3249
3250 stop:
3251         rc = lfsck_layout_trans_stop(env, dev, handle, rc);
3252
3253 log:
3254         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant repaired inconsistent "
3255                "file owner for: parent "DFID", child "DFID", OST-index %u, "
3256                "stripe-index %u, owner %u/%u: rc = %d\n",
3257                lfsck_lfsck2name(com->lc_lfsck), PFID(lfsck_dto2fid(parent)),
3258                PFID(lfsck_dto2fid(child)), llr->llr_ost_idx, llr->llr_lov_idx,
3259                pla->la_uid, pla->la_gid, rc);
3260
3261         return rc;
3262 }
3263
3264 /* Check whether the OST-object correctly back points to the
3265  * MDT-object (@parent) via the XATTR_NAME_FID xattr (@pfid). */
3266 static int lfsck_layout_check_parent(const struct lu_env *env,
3267                                      struct lfsck_component *com,
3268                                      struct dt_object *parent,
3269                                      const struct lu_fid *pfid,
3270                                      const struct lu_fid *cfid,
3271                                      const struct lu_attr *pla,
3272                                      const struct lu_attr *cla,
3273                                      struct lfsck_layout_req *llr,
3274                                      struct lu_buf *lov_ea, __u32 idx)
3275 {
3276         struct lfsck_thread_info        *info   = lfsck_env_info(env);
3277         struct lu_buf                   *buf    = &info->lti_big_buf;
3278         struct dt_object                *tobj;
3279         struct lov_mds_md_v1            *lmm;
3280         struct lov_ost_data_v1          *objs;
3281         int                              rc;
3282         int                              i;
3283         __u32                            magic;
3284         __u16                            count;
3285         ENTRY;
3286
3287         if (fid_is_zero(pfid)) {
3288                 /* client never wrote. */
3289                 if (cla->la_size == 0 && cla->la_blocks == 0) {
3290                         if (unlikely(cla->la_uid != pla->la_uid ||
3291                                      cla->la_gid != pla->la_gid))
3292                                 RETURN (LLIT_INCONSISTENT_OWNER);
3293
3294                         RETURN(0);
3295                 }
3296
3297                 RETURN(LLIT_UNMATCHED_PAIR);
3298         }
3299
3300         if (unlikely(!fid_is_sane(pfid)))
3301                 RETURN(LLIT_UNMATCHED_PAIR);
3302
3303         if (lu_fid_eq(pfid, lu_object_fid(&parent->do_lu))) {
3304                 if (llr->llr_lov_idx == idx)
3305                         RETURN(0);
3306
3307                 RETURN(LLIT_UNMATCHED_PAIR);
3308         }
3309
3310         tobj = lfsck_object_find(env, com->lc_lfsck, pfid);
3311         if (tobj == NULL)
3312                 RETURN(LLIT_UNMATCHED_PAIR);
3313
3314         if (IS_ERR(tobj))
3315                 RETURN(PTR_ERR(tobj));
3316
3317         if (!dt_object_exists(tobj))
3318                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3319
3320         /* Load the tobj's layout EA, in spite of it is a local MDT-object or
3321          * remote one on another MDT. Then check whether the given OST-object
3322          * is in such layout. If yes, it is multiple referenced, otherwise it
3323          * is unmatched referenced case. */
3324         rc = lfsck_layout_get_lovea(env, tobj, buf, NULL);
3325         if (rc == 0)
3326                 GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3327
3328         if (rc < 0)
3329                 GOTO(out, rc);
3330
3331         lmm = buf->lb_buf;
3332         magic = le32_to_cpu(lmm->lmm_magic);
3333         if (magic == LOV_MAGIC_V1) {
3334                 objs = &lmm->lmm_objects[0];
3335         } else {
3336                 LASSERT(magic == LOV_MAGIC_V3);
3337                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
3338         }
3339
3340         count = le16_to_cpu(lmm->lmm_stripe_count);
3341         for (i = 0; i < count; i++, objs++) {
3342                 struct lu_fid           *tfid   = &info->lti_fid2;
3343                 struct ost_id           *oi     = &info->lti_oi;
3344
3345                 if (lovea_slot_is_dummy(objs))
3346                         continue;
3347
3348                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
3349                 ostid_to_fid(tfid, oi, le32_to_cpu(objs->l_ost_idx));
3350                 if (lu_fid_eq(cfid, tfid)) {
3351                         *lov_ea = *buf;
3352
3353                         GOTO(out, rc = LLIT_MULTIPLE_REFERENCED);
3354                 }
3355         }
3356
3357         GOTO(out, rc = LLIT_UNMATCHED_PAIR);
3358
3359 out:
3360         lfsck_object_put(env, tobj);
3361
3362         return rc;
3363 }
3364
3365 static int lfsck_layout_assistant_handle_one(const struct lu_env *env,
3366                                              struct lfsck_component *com,
3367                                              struct lfsck_layout_req *llr)
3368 {
3369         struct lfsck_layout                  *lo     = com->lc_file_ram;
3370         struct lfsck_thread_info             *info   = lfsck_env_info(env);
3371         struct filter_fid_old                *pea    = &info->lti_old_pfid;
3372         struct lu_fid                        *pfid   = &info->lti_fid;
3373         struct lu_buf                        *buf    = NULL;
3374         struct dt_object                     *parent = llr->llr_parent->llo_obj;
3375         struct dt_object                     *child  = llr->llr_child;
3376         struct lu_attr                       *pla    = &info->lti_la;
3377         struct lu_attr                       *cla    = &info->lti_la2;
3378         struct lfsck_instance                *lfsck  = com->lc_lfsck;
3379         struct lfsck_bookmark                *bk     = &lfsck->li_bookmark_ram;
3380         enum lfsck_layout_inconsistency_type  type   = LLIT_NONE;
3381         __u32                                 idx    = 0;
3382         int                                   rc;
3383         ENTRY;
3384
3385         rc = dt_attr_get(env, parent, pla, BYPASS_CAPA);
3386         if (rc != 0) {
3387                 if (lu_object_is_dying(parent->do_lu.lo_header))
3388                         RETURN(0);
3389
3390                 GOTO(out, rc);
3391         }
3392
3393         rc = dt_attr_get(env, child, cla, BYPASS_CAPA);
3394         if (rc == -ENOENT) {
3395                 if (lu_object_is_dying(parent->do_lu.lo_header))
3396                         RETURN(0);
3397
3398                 type = LLIT_DANGLING;
3399                 goto repair;
3400         }
3401
3402         if (rc != 0)
3403                 GOTO(out, rc);
3404
3405         buf = lfsck_buf_get(env, pea, sizeof(struct filter_fid_old));
3406         rc= dt_xattr_get(env, child, buf, XATTR_NAME_FID, BYPASS_CAPA);
3407         if (unlikely(rc >= 0 && rc != sizeof(struct filter_fid_old) &&
3408                      rc != sizeof(struct filter_fid))) {
3409                 type = LLIT_UNMATCHED_PAIR;
3410                 goto repair;
3411         }
3412
3413         if (rc < 0 && rc != -ENODATA)
3414                 GOTO(out, rc);
3415
3416         if (rc == -ENODATA) {
3417                 fid_zero(pfid);
3418         } else {
3419                 fid_le_to_cpu(pfid, &pea->ff_parent);
3420                 /* Currently, the filter_fid::ff_parent::f_ver is not the
3421                  * real parent MDT-object's FID::f_ver, instead it is the
3422                  * OST-object index in its parent MDT-object's layout EA. */
3423                 idx = pfid->f_stripe_idx;
3424                 pfid->f_ver = 0;
3425         }
3426
3427         rc = lfsck_layout_check_parent(env, com, parent, pfid,
3428                                        lu_object_fid(&child->do_lu),
3429                                        pla, cla, llr, buf, idx);
3430         if (rc > 0) {
3431                 type = rc;
3432                 goto repair;
3433         }
3434
3435         if (rc < 0)
3436                 GOTO(out, rc);
3437
3438         if (unlikely(cla->la_uid != pla->la_uid ||
3439                      cla->la_gid != pla->la_gid)) {
3440                 type = LLIT_INCONSISTENT_OWNER;
3441                 goto repair;
3442         }
3443
3444 repair:
3445         if (bk->lb_param & LPF_DRYRUN) {
3446                 if (type != LLIT_NONE)
3447                         GOTO(out, rc = 1);
3448                 else
3449                         GOTO(out, rc = 0);
3450         }
3451
3452         switch (type) {
3453         case LLIT_DANGLING:
3454                 rc = lfsck_layout_repair_dangling(env, com, llr, pla);
3455                 break;
3456         case LLIT_UNMATCHED_PAIR:
3457                 rc = lfsck_layout_repair_unmatched_pair(env, com, llr, pla);
3458                 break;
3459         case LLIT_MULTIPLE_REFERENCED:
3460                 rc = lfsck_layout_repair_multiple_references(env, com, llr,
3461                                                              pla, buf);
3462                 break;
3463         case LLIT_INCONSISTENT_OWNER:
3464                 rc = lfsck_layout_repair_owner(env, com, llr, pla);
3465                 break;
3466         default:
3467                 rc = 0;
3468                 break;
3469         }
3470
3471         GOTO(out, rc);
3472
3473 out:
3474         down_write(&com->lc_sem);
3475         if (rc < 0) {
3476                 struct lfsck_layout_master_data *llmd = com->lc_data;
3477
3478                 if (unlikely(llmd->llmd_exit)) {
3479                         rc = 0;
3480                 } else if (rc == -ENOTCONN || rc == -ESHUTDOWN ||
3481                            rc == -ETIMEDOUT || rc == -EHOSTDOWN ||
3482                            rc == -EHOSTUNREACH) {
3483                         /* If cannot touch the target server,
3484                          * mark the LFSCK as INCOMPLETE. */
3485                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant fail to "
3486                                "talk with OST %x: rc = %d\n",
3487                                lfsck_lfsck2name(lfsck), llr->llr_ost_idx, rc);
3488                         lo->ll_flags |= LF_INCOMPLETE;
3489                         lo->ll_objs_skipped++;
3490                         rc = 0;
3491                 } else {
3492                         lfsck_layout_record_failure(env, lfsck, lo);
3493                 }
3494         } else if (rc > 0) {
3495                 LASSERTF(type > LLIT_NONE && type <= LLIT_MAX,
3496                          "unknown type = %d\n", type);
3497
3498                 lo->ll_objs_repaired[type - 1]++;
3499                 if (bk->lb_param & LPF_DRYRUN &&
3500                     unlikely(lo->ll_pos_first_inconsistent == 0))
3501                         lo->ll_pos_first_inconsistent =
3502                         lfsck->li_obj_oit->do_index_ops->dio_it.store(env,
3503                                                         lfsck->li_di_oit);
3504         }
3505         up_write(&com->lc_sem);
3506
3507         return rc;
3508 }
3509
3510 static int lfsck_layout_assistant(void *args)
3511 {
3512         struct lfsck_thread_args        *lta     = args;
3513         struct lu_env                   *env     = &lta->lta_env;
3514         struct lfsck_component          *com     = lta->lta_com;
3515         struct lfsck_instance           *lfsck   = lta->lta_lfsck;
3516         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
3517         struct lfsck_position           *pos     = &com->lc_pos_start;
3518         struct lfsck_thread_info        *info    = lfsck_env_info(env);
3519         struct lfsck_request            *lr      = &info->lti_lr;
3520         struct lfsck_layout_master_data *llmd    = com->lc_data;
3521         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
3522         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
3523         struct lfsck_layout_req         *llr;
3524         struct l_wait_info               lwi     = { 0 };
3525         int                              rc      = 0;
3526         int                              rc1     = 0;
3527         ENTRY;
3528
3529         memset(lr, 0, sizeof(*lr));
3530         lr->lr_event = LE_START;
3531         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
3532                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ;
3533         lr->lr_speed = bk->lb_speed_limit;
3534         lr->lr_version = bk->lb_version;
3535         lr->lr_param = bk->lb_param;
3536         lr->lr_async_windows = bk->lb_async_windows;
3537         lr->lr_flags = LEF_TO_OST;
3538         if (pos->lp_oit_cookie <= 1)
3539                 lr->lr_param |= LPF_RESET;
3540
3541         rc = lfsck_layout_master_notify_others(env, com, lr);
3542         if (rc != 0) {
3543                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to notify "
3544                        "others for LFSCK start: rc = %d\n",
3545                        lfsck_lfsck2name(lfsck), rc);
3546                 GOTO(fini, rc);
3547         }
3548
3549         spin_lock(&llmd->llmd_lock);
3550         thread_set_flags(athread, SVC_RUNNING);
3551         spin_unlock(&llmd->llmd_lock);
3552         wake_up_all(&mthread->t_ctl_waitq);
3553
3554         while (1) {
3555                 while (!list_empty(&llmd->llmd_req_list)) {
3556                         bool wakeup = false;
3557
3558                         if (unlikely(llmd->llmd_exit ||
3559                                      !thread_is_running(mthread)))
3560                                 GOTO(cleanup1, rc = llmd->llmd_post_result);
3561
3562                         llr = list_entry(llmd->llmd_req_list.next,
3563                                          struct lfsck_layout_req,
3564                                          llr_list);
3565                         /* Only the lfsck_layout_assistant thread itself can
3566                          * remove the "llr" from the head of the list, LFSCK
3567                          * engine thread only inserts other new "lld" at the
3568                          * end of the list. So it is safe to handle current
3569                          * "llr" without the spin_lock. */
3570                         rc = lfsck_layout_assistant_handle_one(env, com, llr);
3571                         spin_lock(&llmd->llmd_lock);
3572                         list_del_init(&llr->llr_list);
3573                         llmd->llmd_prefetched--;
3574                         /* Wake up the main engine thread only when the list
3575                          * is empty or half of the prefetched items have been
3576                          * handled to avoid too frequent thread schedule. */
3577                         if (llmd->llmd_prefetched == 0 ||
3578                             (bk->lb_async_windows != 0 &&
3579                              bk->lb_async_windows / 2 ==
3580                              llmd->llmd_prefetched))
3581                                 wakeup = true;
3582                         spin_unlock(&llmd->llmd_lock);
3583                         if (wakeup)
3584                                 wake_up_all(&mthread->t_ctl_waitq);
3585
3586                         lfsck_layout_req_fini(env, llr);
3587                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
3588                                 GOTO(cleanup1, rc);
3589                 }
3590
3591                 l_wait_event(athread->t_ctl_waitq,
3592                              !lfsck_layout_req_empty(llmd) ||
3593                              llmd->llmd_exit ||
3594                              llmd->llmd_to_post ||
3595                              llmd->llmd_to_double_scan,
3596                              &lwi);
3597
3598                 if (unlikely(llmd->llmd_exit))
3599                         GOTO(cleanup1, rc = llmd->llmd_post_result);
3600
3601                 if (!list_empty(&llmd->llmd_req_list))
3602                         continue;
3603
3604                 if (llmd->llmd_to_post) {
3605                         llmd->llmd_to_post = 0;
3606                         LASSERT(llmd->llmd_post_result > 0);
3607
3608                         memset(lr, 0, sizeof(*lr));
3609                         lr->lr_event = LE_PHASE1_DONE;
3610                         lr->lr_status = llmd->llmd_post_result;
3611                         rc = lfsck_layout_master_notify_others(env, com, lr);
3612                         if (rc != 0)
3613                                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant "
3614                                        "failed to notify others for LFSCK "
3615                                        "post: rc = %d\n",
3616                                        lfsck_lfsck2name(lfsck), rc);
3617
3618                         /* Wakeup the master engine to go ahead. */
3619                         wake_up_all(&mthread->t_ctl_waitq);
3620                 }
3621
3622                 if (llmd->llmd_to_double_scan) {
3623                         llmd->llmd_to_double_scan = 0;
3624                         atomic_inc(&lfsck->li_double_scan_count);
3625                         llmd->llmd_in_double_scan = 1;
3626                         wake_up_all(&mthread->t_ctl_waitq);
3627
3628                         CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 "
3629                                "scan start\n", lfsck_lfsck2name(lfsck));
3630
3631                         com->lc_new_checked = 0;
3632                         com->lc_new_scanned = 0;
3633                         com->lc_time_last_checkpoint = cfs_time_current();
3634                         com->lc_time_next_checkpoint =
3635                                 com->lc_time_last_checkpoint +
3636                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
3637
3638                         /* flush all async updating before handling orphan. */
3639                         dt_sync(env, lfsck->li_next);
3640
3641                         while (llmd->llmd_in_double_scan) {
3642                                 struct lfsck_tgt_descs  *ltds =
3643                                                         &lfsck->li_ost_descs;
3644                                 struct lfsck_tgt_desc   *ltd;
3645
3646                                 rc = lfsck_layout_master_query_others(env, com);
3647                                 if (lfsck_layout_master_to_orphan(llmd))
3648                                         goto orphan;
3649
3650                                 if (rc < 0)
3651                                         GOTO(cleanup2, rc);
3652
3653                                 /* Pull LFSCK status on related targets once
3654                                  * per 30 seconds if we are not notified. */
3655                                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(30),
3656                                                            cfs_time_seconds(1),
3657                                                            NULL, NULL);
3658                                 rc = l_wait_event(athread->t_ctl_waitq,
3659                                         lfsck_layout_master_to_orphan(llmd) ||
3660                                         llmd->llmd_exit ||
3661                                         !thread_is_running(mthread),
3662                                         &lwi);
3663
3664                                 if (unlikely(llmd->llmd_exit ||
3665                                              !thread_is_running(mthread)))
3666                                         GOTO(cleanup2, rc = 0);
3667
3668                                 if (rc == -ETIMEDOUT)
3669                                         continue;
3670
3671                                 if (rc < 0)
3672                                         GOTO(cleanup2, rc);
3673
3674 orphan:
3675                                 spin_lock(&ltds->ltd_lock);
3676                                 while (!list_empty(
3677                                                 &llmd->llmd_ost_phase2_list)) {
3678                                         ltd = list_entry(
3679                                               llmd->llmd_ost_phase2_list.next,
3680                                               struct lfsck_tgt_desc,
3681                                               ltd_layout_phase_list);
3682                                         list_del_init(
3683                                                 &ltd->ltd_layout_phase_list);
3684                                         spin_unlock(&ltds->ltd_lock);
3685
3686                                         if (bk->lb_param & LPF_ALL_TGT) {
3687                                                 rc = lfsck_layout_scan_orphan(
3688                                                                 env, com, ltd);
3689                                                 if (rc != 0 &&
3690                                                     bk->lb_param & LPF_FAILOUT)
3691                                                         GOTO(cleanup2, rc);
3692                                         }
3693
3694                                         if (unlikely(llmd->llmd_exit ||
3695                                                 !thread_is_running(mthread)))
3696                                                 GOTO(cleanup2, rc = 0);
3697
3698                                         spin_lock(&ltds->ltd_lock);
3699                                 }
3700
3701                                 if (list_empty(&llmd->llmd_ost_phase1_list)) {
3702                                         spin_unlock(&ltds->ltd_lock);
3703                                         GOTO(cleanup2, rc = 1);
3704                                 }
3705                                 spin_unlock(&ltds->ltd_lock);
3706                         }
3707                 }
3708         }
3709
3710 cleanup1:
3711         /* Cleanup the unfinished requests. */
3712         spin_lock(&llmd->llmd_lock);
3713         if (rc < 0)
3714                 llmd->llmd_assistant_status = rc;
3715
3716         while (!list_empty(&llmd->llmd_req_list)) {
3717                 llr = list_entry(llmd->llmd_req_list.next,
3718                                  struct lfsck_layout_req,
3719                                  llr_list);
3720                 list_del_init(&llr->llr_list);
3721                 llmd->llmd_prefetched--;
3722                 spin_unlock(&llmd->llmd_lock);
3723                 lfsck_layout_req_fini(env, llr);
3724                 spin_lock(&llmd->llmd_lock);
3725         }
3726         spin_unlock(&llmd->llmd_lock);
3727
3728         LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
3729                  llmd->llmd_prefetched);
3730
3731 cleanup2:
3732         memset(lr, 0, sizeof(*lr));
3733         if (rc > 0) {
3734                 lr->lr_event = LE_PHASE2_DONE;
3735                 lr->lr_status = rc;
3736         } else if (rc == 0) {
3737                 if (lfsck->li_flags & LPF_ALL_TGT) {
3738                         lr->lr_event = LE_STOP;
3739                         lr->lr_status = LS_STOPPED;
3740                 } else {
3741                         lr->lr_event = LE_PEER_EXIT;
3742                         switch (lfsck->li_status) {
3743                         case LS_PAUSED:
3744                         case LS_CO_PAUSED:
3745                                 lr->lr_status = LS_CO_PAUSED;
3746                                 break;
3747                         case LS_STOPPED:
3748                         case LS_CO_STOPPED:
3749                                 lr->lr_status = LS_CO_STOPPED;
3750                                 break;
3751                         default:
3752                                 CDEBUG(D_LFSCK, "%s: unknown status: rc = %d\n",
3753                                        lfsck_lfsck2name(lfsck),
3754                                        lfsck->li_status);
3755                                 lr->lr_status = LS_CO_FAILED;
3756                                 break;
3757                         }
3758                 }
3759         } else {
3760                 if (lfsck->li_flags & LPF_ALL_TGT) {
3761                         lr->lr_event = LE_STOP;
3762                         lr->lr_status = LS_FAILED;
3763                 } else {
3764                         lr->lr_event = LE_PEER_EXIT;
3765                         lr->lr_status = LS_CO_FAILED;
3766                 }
3767         }
3768
3769         rc1 = lfsck_layout_master_notify_others(env, com, lr);
3770         if (rc1 != 0) {
3771                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant failed to "
3772                        "notify others for LFSCK quit: rc = %d\n",
3773                        lfsck_lfsck2name(lfsck), rc1);
3774                 rc = rc1;
3775         }
3776
3777         /* Under force exit case, some requests may be just freed without
3778          * verification, those objects should be re-handled when next run.
3779          * So not update the on-disk tracing file under such case. */
3780         if (llmd->llmd_in_double_scan) {
3781                 struct lfsck_layout *lo = com->lc_file_ram;
3782
3783                 if (!llmd->llmd_exit)
3784                         rc1 = lfsck_layout_double_scan_result(env, com, rc);
3785
3786                 CDEBUG(D_LFSCK, "%s: layout LFSCK assistant phase2 scan "
3787                        "finished, status %d: rc = %d\n",
3788                        lfsck_lfsck2name(lfsck), lo->ll_status, rc1);
3789         }
3790
3791 fini:
3792         if (llmd->llmd_in_double_scan)
3793                 atomic_dec(&lfsck->li_double_scan_count);
3794
3795         spin_lock(&llmd->llmd_lock);
3796         llmd->llmd_assistant_status = (rc1 != 0 ? rc1 : rc);
3797         thread_set_flags(athread, SVC_STOPPED);
3798         wake_up_all(&mthread->t_ctl_waitq);
3799         spin_unlock(&llmd->llmd_lock);
3800         lfsck_thread_args_fini(lta);
3801
3802         return rc;
3803 }
3804
3805 static int
3806 lfsck_layout_slave_async_interpret(const struct lu_env *env,
3807                                    struct ptlrpc_request *req,
3808                                    void *args, int rc)
3809 {
3810         struct lfsck_layout_slave_async_args *llsaa = args;
3811         struct obd_export                    *exp   = llsaa->llsaa_exp;
3812         struct lfsck_component               *com   = llsaa->llsaa_com;
3813         struct lfsck_layout_slave_target     *llst  = llsaa->llsaa_llst;
3814         struct lfsck_layout_slave_data       *llsd  = com->lc_data;
3815         struct lfsck_reply                   *lr    = NULL;
3816         bool                                  done  = false;
3817
3818         if (rc != 0) {
3819                 /* It is quite probably caused by target crash,
3820                  * to make the LFSCK can go ahead, assume that
3821                  * the target finished the LFSCK prcoessing. */
3822                 done = true;
3823         } else {
3824                 lr = req_capsule_server_get(&req->rq_pill, &RMF_LFSCK_REPLY);
3825                 if (lr->lr_status != LS_SCANNING_PHASE1 &&
3826                     lr->lr_status != LS_SCANNING_PHASE2)
3827                         done = true;
3828         }
3829
3830         if (done) {
3831                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave gets the MDT %x "
3832                        "status %d\n", lfsck_lfsck2name(com->lc_lfsck),
3833                        llst->llst_index, lr != NULL ? lr->lr_status : rc);
3834
3835                 lfsck_layout_llst_del(llsd, llst);
3836         }
3837
3838         lfsck_layout_llst_put(llst);
3839         lfsck_component_put(env, com);
3840         class_export_put(exp);
3841
3842         return 0;
3843 }
3844
3845 static int lfsck_layout_async_query(const struct lu_env *env,
3846                                     struct lfsck_component *com,
3847                                     struct obd_export *exp,
3848                                     struct lfsck_layout_slave_target *llst,
3849                                     struct lfsck_request *lr,
3850                                     struct ptlrpc_request_set *set)
3851 {
3852         struct lfsck_layout_slave_async_args *llsaa;
3853         struct ptlrpc_request                *req;
3854         struct lfsck_request                 *tmp;
3855         int                                   rc;
3856         ENTRY;
3857
3858         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_QUERY);
3859         if (req == NULL)
3860                 RETURN(-ENOMEM);
3861
3862         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_QUERY);
3863         if (rc != 0) {
3864                 ptlrpc_request_free(req);
3865                 RETURN(rc);
3866         }
3867
3868         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3869         *tmp = *lr;
3870         ptlrpc_request_set_replen(req);
3871
3872         llsaa = ptlrpc_req_async_args(req);
3873         llsaa->llsaa_exp = exp;
3874         llsaa->llsaa_com = lfsck_component_get(com);
3875         llsaa->llsaa_llst = llst;
3876         req->rq_interpret_reply = lfsck_layout_slave_async_interpret;
3877         ptlrpc_set_add_req(set, req);
3878
3879         RETURN(0);
3880 }
3881
3882 static int lfsck_layout_async_notify(const struct lu_env *env,
3883                                      struct obd_export *exp,
3884                                      struct lfsck_request *lr,
3885                                      struct ptlrpc_request_set *set)
3886 {
3887         struct ptlrpc_request   *req;
3888         struct lfsck_request    *tmp;
3889         int                      rc;
3890         ENTRY;
3891
3892         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
3893         if (req == NULL)
3894                 RETURN(-ENOMEM);
3895
3896         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
3897         if (rc != 0) {
3898                 ptlrpc_request_free(req);
3899                 RETURN(rc);
3900         }
3901
3902         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
3903         *tmp = *lr;
3904         ptlrpc_request_set_replen(req);
3905         ptlrpc_set_add_req(set, req);
3906
3907         RETURN(0);
3908 }
3909
3910 static int
3911 lfsck_layout_slave_query_master(const struct lu_env *env,
3912                                 struct lfsck_component *com)
3913 {
3914         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3915         struct lfsck_instance            *lfsck = com->lc_lfsck;
3916         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3917         struct lfsck_layout_slave_target *llst;
3918         struct obd_export                *exp;
3919         struct ptlrpc_request_set        *set;
3920         int                               rc    = 0;
3921         int                               rc1   = 0;
3922         ENTRY;
3923
3924         set = ptlrpc_prep_set();
3925         if (set == NULL)
3926                 GOTO(log, rc = -ENOMEM);
3927
3928         memset(lr, 0, sizeof(*lr));
3929         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
3930         lr->lr_event = LE_QUERY;
3931         lr->lr_active = LFSCK_TYPE_LAYOUT;
3932
3933         llsd->llsd_touch_gen++;
3934         spin_lock(&llsd->llsd_lock);
3935         while (!list_empty(&llsd->llsd_master_list)) {
3936                 llst = list_entry(llsd->llsd_master_list.next,
3937                                   struct lfsck_layout_slave_target,
3938                                   llst_list);
3939                 if (llst->llst_gen == llsd->llsd_touch_gen)
3940                         break;
3941
3942                 llst->llst_gen = llsd->llsd_touch_gen;
3943                 list_del(&llst->llst_list);
3944                 list_add_tail(&llst->llst_list,
3945                               &llsd->llsd_master_list);
3946                 atomic_inc(&llst->llst_ref);
3947                 spin_unlock(&llsd->llsd_lock);
3948
3949                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
3950                                                llst->llst_index);
3951                 if (exp == NULL) {
3952                         lfsck_layout_llst_del(llsd, llst);
3953                         lfsck_layout_llst_put(llst);
3954                         spin_lock(&llsd->llsd_lock);
3955                         continue;
3956                 }
3957
3958                 rc = lfsck_layout_async_query(env, com, exp, llst, lr, set);
3959                 if (rc != 0) {
3960                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
3961                                "query %s for layout: rc = %d\n",
3962                                lfsck_lfsck2name(lfsck),
3963                                exp->exp_obd->obd_name, rc);
3964
3965                         rc1 = rc;
3966                         lfsck_layout_llst_put(llst);
3967                         class_export_put(exp);
3968                 }
3969                 spin_lock(&llsd->llsd_lock);
3970         }
3971         spin_unlock(&llsd->llsd_lock);
3972
3973         rc = ptlrpc_set_wait(set);
3974         ptlrpc_set_destroy(set);
3975
3976         GOTO(log, rc = (rc1 != 0 ? rc1 : rc));
3977
3978 log:
3979         CDEBUG(D_LFSCK, "%s: layout LFSCK slave queries master: rc = %d\n",
3980                lfsck_lfsck2name(com->lc_lfsck), rc);
3981
3982         return rc;
3983 }
3984
3985 static void
3986 lfsck_layout_slave_notify_master(const struct lu_env *env,
3987                                  struct lfsck_component *com,
3988                                  enum lfsck_events event, int result)
3989 {
3990         struct lfsck_instance            *lfsck = com->lc_lfsck;
3991         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
3992         struct lfsck_request             *lr    = &lfsck_env_info(env)->lti_lr;
3993         struct lfsck_layout_slave_target *llst;
3994         struct obd_export                *exp;
3995         struct ptlrpc_request_set        *set;
3996         int                               rc;
3997         ENTRY;
3998
3999         CDEBUG(D_LFSCK, "%s: layout LFSCK slave notifies master\n",
4000                lfsck_lfsck2name(com->lc_lfsck));
4001
4002         set = ptlrpc_prep_set();
4003         if (set == NULL)
4004                 RETURN_EXIT;
4005
4006         memset(lr, 0, sizeof(*lr));
4007         lr->lr_event = event;
4008         lr->lr_flags = LEF_FROM_OST;
4009         lr->lr_status = result;
4010         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
4011         lr->lr_active = LFSCK_TYPE_LAYOUT;
4012         llsd->llsd_touch_gen++;
4013         spin_lock(&llsd->llsd_lock);
4014         while (!list_empty(&llsd->llsd_master_list)) {
4015                 llst = list_entry(llsd->llsd_master_list.next,
4016                                   struct lfsck_layout_slave_target,
4017                                   llst_list);
4018                 if (llst->llst_gen == llsd->llsd_touch_gen)
4019                         break;
4020
4021                 llst->llst_gen = llsd->llsd_touch_gen;
4022                 list_del(&llst->llst_list);
4023                 list_add_tail(&llst->llst_list,
4024                               &llsd->llsd_master_list);
4025                 atomic_inc(&llst->llst_ref);
4026                 spin_unlock(&llsd->llsd_lock);
4027
4028                 exp = lustre_find_lwp_by_index(lfsck->li_obd->obd_name,
4029                                                llst->llst_index);
4030                 if (exp == NULL) {
4031                         lfsck_layout_llst_del(llsd, llst);
4032                         lfsck_layout_llst_put(llst);
4033                         spin_lock(&llsd->llsd_lock);
4034                         continue;
4035                 }
4036
4037                 rc = lfsck_layout_async_notify(env, exp, lr, set);
4038                 if (rc != 0)
4039                         CDEBUG(D_LFSCK, "%s: layout LFSCK slave fail to "
4040                                "notify %s for layout: rc = %d\n",
4041                                lfsck_lfsck2name(lfsck),
4042                                exp->exp_obd->obd_name, rc);
4043
4044                 lfsck_layout_llst_put(llst);
4045                 class_export_put(exp);
4046                 spin_lock(&llsd->llsd_lock);
4047         }
4048         spin_unlock(&llsd->llsd_lock);
4049
4050         ptlrpc_set_wait(set);
4051         ptlrpc_set_destroy(set);
4052
4053         RETURN_EXIT;
4054 }
4055
4056 /*
4057  * \ret -ENODATA: unrecognized stripe
4058  * \ret = 0     : recognized stripe
4059  * \ret < 0     : other failures
4060  */
4061 static int lfsck_layout_master_check_pairs(const struct lu_env *env,
4062                                            struct lfsck_component *com,
4063                                            struct lu_fid *cfid,
4064                                            struct lu_fid *pfid)
4065 {
4066         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4067         struct lu_buf                   *buf    = &info->lti_big_buf;
4068         struct ost_id                   *oi     = &info->lti_oi;
4069         struct dt_object                *obj;
4070         struct lov_mds_md_v1            *lmm;
4071         struct lov_ost_data_v1          *objs;
4072         __u32                            idx    = pfid->f_stripe_idx;
4073         __u32                            magic;
4074         int                              rc     = 0;
4075         int                              i;
4076         __u16                            count;
4077         ENTRY;
4078
4079         pfid->f_ver = 0;
4080         obj = lfsck_object_find_by_dev(env, com->lc_lfsck->li_bottom, pfid);
4081         if (IS_ERR(obj))
4082                 RETURN(PTR_ERR(obj));
4083
4084         dt_read_lock(env, obj, 0);
4085         if (unlikely(!dt_object_exists(obj)))
4086                 GOTO(unlock, rc = -ENOENT);
4087
4088         rc = lfsck_layout_get_lovea(env, obj, buf, NULL);
4089         if (rc < 0)
4090                 GOTO(unlock, rc);
4091
4092         if (rc == 0)
4093                 GOTO(unlock, rc = -ENODATA);
4094
4095         lmm = buf->lb_buf;
4096         rc = lfsck_layout_verify_header(lmm);
4097         if (rc != 0)
4098                 GOTO(unlock, rc);
4099
4100         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4101          * been verified in lfsck_layout_verify_header() already. If some
4102          * new magic introduced in the future, then layout LFSCK needs to
4103          * be updated also. */
4104         magic = le32_to_cpu(lmm->lmm_magic);
4105         if (magic == LOV_MAGIC_V1) {
4106                 objs = &lmm->lmm_objects[0];
4107         } else {
4108                 LASSERT(magic == LOV_MAGIC_V3);
4109                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4110         }
4111
4112         fid_to_ostid(cfid, oi);
4113         count = le16_to_cpu(lmm->lmm_stripe_count);
4114         for (i = 0; i < count; i++, objs++) {
4115                 struct ost_id oi2;
4116
4117                 ostid_le_to_cpu(&objs->l_ost_oi, &oi2);
4118                 if (memcmp(oi, &oi2, sizeof(*oi)) == 0)
4119                         GOTO(unlock, rc = (i != idx ? -ENODATA : 0));
4120         }
4121
4122         GOTO(unlock, rc = -ENODATA);
4123
4124 unlock:
4125         dt_read_unlock(env, obj);
4126         lu_object_put(env, &obj->do_lu);
4127
4128         return rc;
4129 }
4130
4131 /*
4132  * The LFSCK-on-OST will ask the LFSCK-on-MDT to check whether the given
4133  * MDT-object/OST-object pairs match or not to aviod transfer MDT-object
4134  * layout EA from MDT to OST. On one hand, the OST no need to understand
4135  * the layout EA structure; on the other hand, it may cause trouble when
4136  * transfer large layout EA from MDT to OST via normal OUT RPC.
4137  *
4138  * \ret > 0: unrecognized stripe
4139  * \ret = 0: recognized stripe
4140  * \ret < 0: other failures
4141  */
4142 static int lfsck_layout_slave_check_pairs(const struct lu_env *env,
4143                                           struct lfsck_component *com,
4144                                           struct lu_fid *cfid,
4145                                           struct lu_fid *pfid)
4146 {
4147         struct lfsck_instance    *lfsck  = com->lc_lfsck;
4148         struct obd_device        *obd    = lfsck->li_obd;
4149         struct seq_server_site   *ss     =
4150                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
4151         struct obd_export        *exp    = NULL;
4152         struct ptlrpc_request    *req    = NULL;
4153         struct lfsck_request     *lr;
4154         struct lu_seq_range       range  = { 0 };
4155         int                       rc     = 0;
4156         ENTRY;
4157
4158         if (unlikely(fid_is_idif(pfid)))
4159                 RETURN(1);
4160
4161         fld_range_set_any(&range);
4162         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(pfid), &range);
4163         if (rc != 0)
4164                 RETURN(rc == -ENOENT ? 1 : rc);
4165
4166         if (unlikely(!fld_range_is_mdt(&range)))
4167                 RETURN(1);
4168
4169         exp = lustre_find_lwp_by_index(obd->obd_name, range.lsr_index);
4170         if (unlikely(exp == NULL))
4171                 RETURN(1);
4172
4173         if (!(exp_connect_flags(exp) & OBD_CONNECT_LFSCK))
4174                 GOTO(out, rc = -EOPNOTSUPP);
4175
4176         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LFSCK_NOTIFY);
4177         if (req == NULL)
4178                 GOTO(out, rc = -ENOMEM);
4179
4180         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
4181         if (rc != 0) {
4182                 ptlrpc_request_free(req);
4183
4184                 GOTO(out, rc);
4185         }
4186
4187         lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
4188         memset(lr, 0, sizeof(*lr));
4189         lr->lr_event = LE_PAIRS_VERIFY;
4190         lr->lr_active = LFSCK_TYPE_LAYOUT;
4191         lr->lr_fid = *cfid; /* OST-object itself FID. */
4192         lr->lr_fid2 = *pfid; /* The claimed parent FID. */
4193
4194         ptlrpc_request_set_replen(req);
4195         rc = ptlrpc_queue_wait(req);
4196         ptlrpc_req_finished(req);
4197
4198         if (rc == -ENOENT || rc == -ENODATA)
4199                 rc = 1;
4200
4201         GOTO(out, rc);
4202
4203 out:
4204         if (exp != NULL)
4205                 class_export_put(exp);
4206
4207         return rc;
4208 }
4209
4210 static int lfsck_layout_slave_repair_pfid(const struct lu_env *env,
4211                                           struct lfsck_component *com,
4212                                           struct lfsck_request *lr)
4213 {
4214         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4215         struct filter_fid               *ff     = &info->lti_new_pfid;
4216         struct lu_buf                   *buf;
4217         struct dt_device                *dev    = com->lc_lfsck->li_bottom;
4218         struct dt_object                *obj;
4219         struct thandle                  *th     = NULL;
4220         int                              rc     = 0;
4221         ENTRY;
4222
4223         obj = lfsck_object_find_by_dev(env, dev, &lr->lr_fid);
4224         if (IS_ERR(obj))
4225                 GOTO(log, rc = PTR_ERR(obj));
4226
4227         fid_cpu_to_le(&ff->ff_parent, &lr->lr_fid2);
4228         buf = lfsck_buf_get(env, ff, sizeof(*ff));
4229         dt_write_lock(env, obj, 0);
4230         if (unlikely(!dt_object_exists(obj)))
4231                 GOTO(unlock, rc = 0);
4232
4233         th = dt_trans_create(env, dev);
4234         if (IS_ERR(th))
4235                 GOTO(unlock, rc = PTR_ERR(th));
4236
4237         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th);
4238         if (rc != 0)
4239                 GOTO(stop, rc);
4240
4241         rc = dt_trans_start_local(env, dev, th);
4242         if (rc != 0)
4243                 GOTO(stop, rc);
4244
4245         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_FID, 0, th, BYPASS_CAPA);
4246
4247         GOTO(stop, rc);
4248
4249 stop:
4250         dt_trans_stop(env, dev, th);
4251
4252 unlock:
4253         dt_write_unlock(env, obj);
4254         lu_object_put(env, &obj->do_lu);
4255
4256 log:
4257         CDEBUG(D_LFSCK, "%s: layout LFSCK slave repaired pfid for "DFID
4258                ", parent "DFID": rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
4259                PFID(&lr->lr_fid), PFID(&lr->lr_fid2), rc);
4260
4261         return rc;
4262 }
4263
4264 /* layout APIs */
4265
4266 static int lfsck_layout_reset(const struct lu_env *env,
4267                               struct lfsck_component *com, bool init)
4268 {
4269         struct lfsck_layout     *lo    = com->lc_file_ram;
4270         int                      rc;
4271
4272         down_write(&com->lc_sem);
4273         if (init) {
4274                 memset(lo, 0, com->lc_file_size);
4275         } else {
4276                 __u32 count = lo->ll_success_count;
4277                 __u64 last_time = lo->ll_time_last_complete;
4278
4279                 memset(lo, 0, com->lc_file_size);
4280                 lo->ll_success_count = count;
4281                 lo->ll_time_last_complete = last_time;
4282         }
4283
4284         lo->ll_magic = LFSCK_LAYOUT_MAGIC;
4285         lo->ll_status = LS_INIT;
4286
4287         rc = lfsck_layout_store(env, com);
4288         up_write(&com->lc_sem);
4289
4290         CDEBUG(D_LFSCK, "%s: layout LFSCK reset: rc = %d\n",
4291                lfsck_lfsck2name(com->lc_lfsck), rc);
4292
4293         return rc;
4294 }
4295
4296 static void lfsck_layout_fail(const struct lu_env *env,
4297                               struct lfsck_component *com, bool new_checked)
4298 {
4299         struct lfsck_layout *lo = com->lc_file_ram;
4300
4301         down_write(&com->lc_sem);
4302         if (new_checked)
4303                 com->lc_new_checked++;
4304         lfsck_layout_record_failure(env, com->lc_lfsck, lo);
4305         up_write(&com->lc_sem);
4306 }
4307
4308 static int lfsck_layout_master_checkpoint(const struct lu_env *env,
4309                                           struct lfsck_component *com, bool init)
4310 {
4311         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4312         struct lfsck_layout             *lo      = com->lc_file_ram;
4313         struct lfsck_layout_master_data *llmd    = com->lc_data;
4314         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4315         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4316         struct l_wait_info               lwi     = { 0 };
4317         int                              rc;
4318
4319         if (com->lc_new_checked == 0 && !init)
4320                 return 0;
4321
4322         l_wait_event(mthread->t_ctl_waitq,
4323                      list_empty(&llmd->llmd_req_list) ||
4324                      !thread_is_running(mthread) ||
4325                      thread_is_stopped(athread),
4326                      &lwi);
4327
4328         if (!thread_is_running(mthread) || thread_is_stopped(athread))
4329                 return 0;
4330
4331         down_write(&com->lc_sem);
4332         if (init) {
4333                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4334         } else {
4335                 lo->ll_pos_last_checkpoint =
4336                                         lfsck->li_pos_current.lp_oit_cookie;
4337                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4338                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4339                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4340                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4341                 com->lc_new_checked = 0;
4342         }
4343
4344         rc = lfsck_layout_store(env, com);
4345         up_write(&com->lc_sem);
4346
4347         CDEBUG(D_LFSCK, "%s: layout LFSCK master checkpoint at the pos ["
4348                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4349                lfsck->li_pos_current.lp_oit_cookie, rc);
4350
4351         return rc;
4352 }
4353
4354 static int lfsck_layout_slave_checkpoint(const struct lu_env *env,
4355                                          struct lfsck_component *com, bool init)
4356 {
4357         struct lfsck_instance   *lfsck = com->lc_lfsck;
4358         struct lfsck_layout     *lo    = com->lc_file_ram;
4359         int                      rc;
4360
4361         if (com->lc_new_checked == 0 && !init)
4362                 return 0;
4363
4364         down_write(&com->lc_sem);
4365         if (init) {
4366                 lo->ll_pos_latest_start = lfsck->li_pos_current.lp_oit_cookie;
4367         } else {
4368                 lo->ll_pos_last_checkpoint =
4369                                         lfsck->li_pos_current.lp_oit_cookie;
4370                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
4371                                 HALF_SEC - lfsck->li_time_last_checkpoint);
4372                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
4373                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
4374                 com->lc_new_checked = 0;
4375         }
4376
4377         rc = lfsck_layout_store(env, com);
4378         up_write(&com->lc_sem);
4379
4380         CDEBUG(D_LFSCK, "%s: layout LFSCK slave checkpoint at the pos ["
4381                LPU64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
4382                lfsck->li_pos_current.lp_oit_cookie, rc);
4383
4384         return rc;
4385 }
4386
4387 static int lfsck_layout_prep(const struct lu_env *env,
4388                              struct lfsck_component *com,
4389                              struct lfsck_start *start)
4390 {
4391         struct lfsck_instance   *lfsck  = com->lc_lfsck;
4392         struct lfsck_layout     *lo     = com->lc_file_ram;
4393         struct lfsck_position   *pos    = &com->lc_pos_start;
4394
4395         fid_zero(&pos->lp_dir_parent);
4396         pos->lp_dir_cookie = 0;
4397         if (lo->ll_status == LS_COMPLETED ||
4398             lo->ll_status == LS_PARTIAL ||
4399             /* To handle orphan, must scan from the beginning. */
4400             (start != NULL && start->ls_flags & LPF_ORPHAN)) {
4401                 int rc;
4402
4403                 rc = lfsck_layout_reset(env, com, false);
4404                 if (rc == 0)
4405                         rc = lfsck_set_param(env, lfsck, start, true);
4406
4407                 if (rc != 0) {
4408                         CDEBUG(D_LFSCK, "%s: layout LFSCK prep failed: "
4409                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4410
4411                         return rc;
4412                 }
4413         }
4414
4415         down_write(&com->lc_sem);
4416         lo->ll_time_latest_start = cfs_time_current_sec();
4417         spin_lock(&lfsck->li_lock);
4418         if (lo->ll_flags & LF_SCANNED_ONCE) {
4419                 if (!lfsck->li_drop_dryrun ||
4420                     lo->ll_pos_first_inconsistent == 0) {
4421                         lo->ll_status = LS_SCANNING_PHASE2;
4422                         list_del_init(&com->lc_link);
4423                         list_add_tail(&com->lc_link,
4424                                       &lfsck->li_list_double_scan);
4425                         pos->lp_oit_cookie = 0;
4426                 } else {
4427                         int i;
4428
4429                         lo->ll_status = LS_SCANNING_PHASE1;
4430                         lo->ll_run_time_phase1 = 0;
4431                         lo->ll_run_time_phase2 = 0;
4432                         lo->ll_objs_checked_phase1 = 0;
4433                         lo->ll_objs_checked_phase2 = 0;
4434                         lo->ll_objs_failed_phase1 = 0;
4435                         lo->ll_objs_failed_phase2 = 0;
4436                         for (i = 0; i < LLIT_MAX; i++)
4437                                 lo->ll_objs_repaired[i] = 0;
4438
4439                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4440                         fid_zero(&com->lc_fid_latest_scanned_phase2);
4441                 }
4442         } else {
4443                 lo->ll_status = LS_SCANNING_PHASE1;
4444                 if (!lfsck->li_drop_dryrun ||
4445                     lo->ll_pos_first_inconsistent == 0)
4446                         pos->lp_oit_cookie = lo->ll_pos_last_checkpoint + 1;
4447                 else
4448                         pos->lp_oit_cookie = lo->ll_pos_first_inconsistent;
4449         }
4450         spin_unlock(&lfsck->li_lock);
4451         up_write(&com->lc_sem);
4452
4453         return 0;
4454 }
4455
4456 static int lfsck_layout_slave_prep(const struct lu_env *env,
4457                                    struct lfsck_component *com,
4458                                    struct lfsck_start_param *lsp)
4459 {
4460         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4461         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4462         struct lfsck_layout             *lo     = com->lc_file_ram;
4463         struct lfsck_start              *start  = lsp->lsp_start;
4464         int                              rc;
4465
4466         rc = lfsck_layout_prep(env, com, start);
4467         if (rc != 0)
4468                 return rc;
4469
4470         if (lo->ll_flags & LF_CRASHED_LASTID &&
4471             list_empty(&llsd->llsd_master_list)) {
4472                 LASSERT(lfsck->li_out_notify != NULL);
4473
4474                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4475                                      LE_LASTID_REBUILDING);
4476         }
4477
4478         if (!lsp->lsp_index_valid)
4479                 return 0;
4480
4481         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
4482         if (rc == 0 && start != NULL && start->ls_flags & LPF_ORPHAN) {
4483                 LASSERT(!llsd->llsd_rbtree_valid);
4484
4485                 write_lock(&llsd->llsd_rb_lock);
4486                 rc = lfsck_rbtree_setup(env, com);
4487                 write_unlock(&llsd->llsd_rb_lock);
4488         }
4489
4490         CDEBUG(D_LFSCK, "%s: layout LFSCK slave prep done, start pos ["
4491                LPU64"]\n", lfsck_lfsck2name(lfsck),
4492                com->lc_pos_start.lp_oit_cookie);
4493
4494         return rc;
4495 }
4496
4497 static int lfsck_layout_master_prep(const struct lu_env *env,
4498                                     struct lfsck_component *com,
4499                                     struct lfsck_start_param *lsp)
4500 {
4501         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4502         struct lfsck_layout_master_data *llmd    = com->lc_data;
4503         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4504         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4505         struct lfsck_thread_args        *lta;
4506         struct task_struct              *task;
4507         int                              rc;
4508         ENTRY;
4509
4510         rc = lfsck_layout_prep(env, com, lsp->lsp_start);
4511         if (rc != 0)
4512                 RETURN(rc);
4513
4514         llmd->llmd_assistant_status = 0;
4515         llmd->llmd_post_result = 0;
4516         llmd->llmd_to_post = 0;
4517         llmd->llmd_to_double_scan = 0;
4518         llmd->llmd_in_double_scan = 0;
4519         llmd->llmd_exit = 0;
4520         thread_set_flags(athread, 0);
4521
4522         lta = lfsck_thread_args_init(lfsck, com, lsp);
4523         if (IS_ERR(lta))
4524                 RETURN(PTR_ERR(lta));
4525
4526         task = kthread_run(lfsck_layout_assistant, lta, "lfsck_layout");
4527         if (IS_ERR(task)) {
4528                 rc = PTR_ERR(task);
4529                 CERROR("%s: cannot start LFSCK layout assistant thread: "
4530                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
4531                 lfsck_thread_args_fini(lta);
4532         } else {
4533                 struct l_wait_info lwi = { 0 };
4534
4535                 l_wait_event(mthread->t_ctl_waitq,
4536                              thread_is_running(athread) ||
4537                              thread_is_stopped(athread),
4538                              &lwi);
4539                 if (unlikely(!thread_is_running(athread)))
4540                         rc = llmd->llmd_assistant_status;
4541                 else
4542                         rc = 0;
4543         }
4544
4545         CDEBUG(D_LFSCK, "%s: layout LFSCK master prep done, start pos ["
4546                LPU64"\n", lfsck_lfsck2name(lfsck),
4547                com->lc_pos_start.lp_oit_cookie);
4548
4549         RETURN(rc);
4550 }
4551
4552 /* Pre-fetch the attribute for each stripe in the given layout EA. */
4553 static int lfsck_layout_scan_stripes(const struct lu_env *env,
4554                                      struct lfsck_component *com,
4555                                      struct dt_object *parent,
4556                                      struct lov_mds_md_v1 *lmm)
4557 {
4558         struct lfsck_thread_info        *info    = lfsck_env_info(env);
4559         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4560         struct lfsck_bookmark           *bk      = &lfsck->li_bookmark_ram;
4561         struct lfsck_layout             *lo      = com->lc_file_ram;
4562         struct lfsck_layout_master_data *llmd    = com->lc_data;
4563         struct lfsck_layout_object      *llo     = NULL;
4564         struct lov_ost_data_v1          *objs;
4565         struct lfsck_tgt_descs          *ltds    = &lfsck->li_ost_descs;
4566         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4567         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4568                 struct l_wait_info       lwi     = { 0 };
4569         struct lu_buf                   *buf;
4570         int                              rc      = 0;
4571         int                              i;
4572         __u32                            magic;
4573         __u16                            count;
4574         __u16                            gen;
4575         ENTRY;
4576
4577         buf = lfsck_buf_get(env, &info->lti_old_pfid,
4578                             sizeof(struct filter_fid_old));
4579         count = le16_to_cpu(lmm->lmm_stripe_count);
4580         gen = le16_to_cpu(lmm->lmm_layout_gen);
4581         /* Currently, we only support LOV_MAGIC_V1/LOV_MAGIC_V3 which has
4582          * been verified in lfsck_layout_verify_header() already. If some
4583          * new magic introduced in the future, then layout LFSCK needs to
4584          * be updated also. */
4585         magic = le32_to_cpu(lmm->lmm_magic);
4586         if (magic == LOV_MAGIC_V1) {
4587                 objs = &lmm->lmm_objects[0];
4588         } else {
4589                 LASSERT(magic == LOV_MAGIC_V3);
4590                 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
4591         }
4592
4593         for (i = 0; i < count; i++, objs++) {
4594                 struct lu_fid           *fid    = &info->lti_fid;
4595                 struct ost_id           *oi     = &info->lti_oi;
4596                 struct lfsck_layout_req *llr;
4597                 struct lfsck_tgt_desc   *tgt    = NULL;
4598                 struct dt_object        *cobj   = NULL;
4599                 __u32                    index  =
4600                                         le32_to_cpu(objs->l_ost_idx);
4601                 bool                     wakeup = false;
4602
4603                 if (unlikely(lovea_slot_is_dummy(objs)))
4604                         continue;
4605
4606                 l_wait_event(mthread->t_ctl_waitq,
4607                              bk->lb_async_windows == 0 ||
4608                              llmd->llmd_prefetched < bk->lb_async_windows ||
4609                              !thread_is_running(mthread) ||
4610                              thread_is_stopped(athread),
4611                              &lwi);
4612
4613                 if (unlikely(!thread_is_running(mthread)) ||
4614                              thread_is_stopped(athread))
4615                         GOTO(out, rc = 0);
4616
4617                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
4618                 ostid_to_fid(fid, oi, index);
4619                 tgt = lfsck_tgt_get(ltds, index);
4620                 if (unlikely(tgt == NULL)) {
4621                         CDEBUG(D_LFSCK, "%s: cannot talk with OST %x which "
4622                                "did not join the layout LFSCK\n",
4623                                lfsck_lfsck2name(lfsck), index);
4624                         lo->ll_flags |= LF_INCOMPLETE;
4625                         goto next;
4626                 }
4627
4628                 cobj = lfsck_object_find_by_dev(env, tgt->ltd_tgt, fid);
4629                 if (IS_ERR(cobj)) {
4630                         rc = PTR_ERR(cobj);
4631                         goto next;
4632                 }
4633
4634                 rc = dt_declare_attr_get(env, cobj, BYPASS_CAPA);
4635                 if (rc != 0)
4636                         goto next;
4637
4638                 rc = dt_declare_xattr_get(env, cobj, buf, XATTR_NAME_FID,
4639                                           BYPASS_CAPA);
4640                 if (rc != 0)
4641                         goto next;
4642
4643                 if (llo == NULL) {
4644                         llo = lfsck_layout_object_init(env, parent, gen);
4645                         if (IS_ERR(llo)) {
4646                                 rc = PTR_ERR(llo);
4647                                 goto next;
4648                         }
4649                 }
4650
4651                 llr = lfsck_layout_req_init(llo, cobj, index, i);
4652                 if (IS_ERR(llr)) {
4653                         rc = PTR_ERR(llr);
4654                         goto next;
4655                 }
4656
4657                 cobj = NULL;
4658                 spin_lock(&llmd->llmd_lock);
4659                 if (llmd->llmd_assistant_status < 0) {
4660                         spin_unlock(&llmd->llmd_lock);
4661                         lfsck_layout_req_fini(env, llr);
4662                         lfsck_tgt_put(tgt);
4663                         RETURN(llmd->llmd_assistant_status);
4664                 }
4665
4666                 list_add_tail(&llr->llr_list, &llmd->llmd_req_list);
4667                 if (llmd->llmd_prefetched == 0)
4668                         wakeup = true;
4669
4670                 llmd->llmd_prefetched++;
4671                 spin_unlock(&llmd->llmd_lock);
4672                 if (wakeup)
4673                         wake_up_all(&athread->t_ctl_waitq);
4674
4675 next:
4676                 down_write(&com->lc_sem);
4677                 com->lc_new_checked++;
4678                 if (rc < 0)
4679                         lfsck_layout_record_failure(env, lfsck, lo);
4680                 up_write(&com->lc_sem);
4681
4682                 if (cobj != NULL && !IS_ERR(cobj))
4683                         lu_object_put(env, &cobj->do_lu);
4684
4685                 if (likely(tgt != NULL))
4686                         lfsck_tgt_put(tgt);
4687
4688                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
4689                         GOTO(out, rc);
4690         }
4691
4692         GOTO(out, rc = 0);
4693
4694 out:
4695         if (llo != NULL && !IS_ERR(llo))
4696                 lfsck_layout_object_put(env, llo);
4697
4698         return rc;
4699 }
4700
4701 /* For the given object, read its layout EA locally. For each stripe, pre-fetch
4702  * the OST-object's attribute and generate an structure lfsck_layout_req on the
4703  * list ::llmd_req_list.
4704  *
4705  * For each request on above list, the lfsck_layout_assistant thread compares
4706  * the OST side attribute with local attribute, if inconsistent, then repair it.
4707  *
4708  * All above processing is async mode with pipeline. */
4709 static int lfsck_layout_master_exec_oit(const struct lu_env *env,
4710                                         struct lfsck_component *com,
4711                                         struct dt_object *obj)
4712 {
4713         struct lfsck_thread_info        *info   = lfsck_env_info(env);
4714         struct ost_id                   *oi     = &info->lti_oi;
4715         struct lfsck_layout             *lo     = com->lc_file_ram;
4716         struct lfsck_layout_master_data *llmd   = com->lc_data;
4717         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4718         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
4719         struct thandle                  *handle = NULL;
4720         struct lu_buf                   *buf    = &info->lti_big_buf;
4721         struct lov_mds_md_v1            *lmm    = NULL;
4722         struct dt_device                *dev    = lfsck->li_bottom;
4723         struct lustre_handle             lh     = { 0 };
4724         ssize_t                          buflen = buf->lb_len;
4725         int                              rc     = 0;
4726         bool                             locked = false;
4727         bool                             stripe = false;
4728         bool                             bad_oi = false;
4729         ENTRY;
4730
4731         if (!S_ISREG(lfsck_object_type(obj)))
4732                 GOTO(out, rc = 0);
4733
4734         if (llmd->llmd_assistant_status < 0)
4735                 GOTO(out, rc = -ESRCH);
4736
4737         fid_to_lmm_oi(lfsck_dto2fid(obj), oi);
4738         lmm_oi_cpu_to_le(oi, oi);
4739         dt_read_lock(env, obj, 0);
4740         locked = true;
4741
4742 again:
4743         rc = lfsck_layout_get_lovea(env, obj, buf, &buflen);
4744         if (rc <= 0)
4745                 GOTO(out, rc);
4746
4747         buf->lb_len = rc;
4748         lmm = buf->lb_buf;
4749         rc = lfsck_layout_verify_header(lmm);
4750         /* If the LOV EA crashed, then it is possible to be rebuilt later
4751          * when handle orphan OST-objects. */
4752         if (rc != 0)
4753                 GOTO(out, rc);
4754
4755         if (memcmp(oi, &lmm->lmm_oi, sizeof(*oi)) == 0)
4756                 GOTO(out, stripe = true);
4757
4758         /* Inconsistent lmm_oi, should be repaired. */
4759         bad_oi = true;
4760
4761         if (bk->lb_param & LPF_DRYRUN) {
4762                 down_write(&com->lc_sem);
4763                 lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4764                 up_write(&com->lc_sem);
4765
4766                 GOTO(out, stripe = true);
4767         }
4768
4769         if (!lustre_handle_is_used(&lh)) {
4770                 dt_read_unlock(env, obj);
4771                 locked = false;
4772                 buf->lb_len = buflen;
4773                 rc = lfsck_layout_lock(env, com, obj, &lh,
4774                                        MDS_INODELOCK_LAYOUT |
4775                                        MDS_INODELOCK_XATTR);
4776                 if (rc != 0)
4777                         GOTO(out, rc);
4778
4779                 handle = dt_trans_create(env, dev);
4780                 if (IS_ERR(handle))
4781                         GOTO(out, rc = PTR_ERR(handle));
4782
4783                 rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
4784                                           LU_XATTR_REPLACE, handle);
4785                 if (rc != 0)
4786                         GOTO(out, rc);
4787
4788                 rc = dt_trans_start_local(env, dev, handle);
4789                 if (rc != 0)
4790                         GOTO(out, rc);
4791
4792                 dt_write_lock(env, obj, 0);
4793                 locked = true;
4794
4795                 goto again;
4796         }
4797
4798         lmm->lmm_oi = *oi;
4799         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LOV,
4800                           LU_XATTR_REPLACE, handle, BYPASS_CAPA);
4801         if (rc != 0)
4802                 GOTO(out, rc);
4803
4804         down_write(&com->lc_sem);
4805         lo->ll_objs_repaired[LLIT_OTHERS - 1]++;
4806         up_write(&com->lc_sem);
4807
4808         GOTO(out, stripe = true);
4809
4810 out:
4811         if (locked) {
4812                 if (lustre_handle_is_used(&lh))
4813                         dt_write_unlock(env, obj);
4814                 else
4815                         dt_read_unlock(env, obj);
4816         }
4817
4818         if (handle != NULL && !IS_ERR(handle))
4819                 dt_trans_stop(env, dev, handle);
4820
4821         lfsck_layout_unlock(&lh);
4822
4823         if (bad_oi)
4824                 CDEBUG(D_LFSCK, "%s: layout LFSCK master %s bad lmm_oi for "
4825                        DFID": rc = %d\n", lfsck_lfsck2name(lfsck),
4826                        bk->lb_param & LPF_DRYRUN ? "found" : "repaired",
4827                        PFID(lfsck_dto2fid(obj)), rc);
4828
4829         if (stripe) {
4830                 rc = lfsck_layout_scan_stripes(env, com, obj, lmm);
4831         } else {
4832                 down_write(&com->lc_sem);
4833                 com->lc_new_checked++;
4834                 if (rc < 0)
4835                         lfsck_layout_record_failure(env, lfsck, lo);
4836                 up_write(&com->lc_sem);
4837         }
4838         buf->lb_len = buflen;
4839
4840         return rc;
4841 }
4842
4843 static int lfsck_layout_slave_exec_oit(const struct lu_env *env,
4844                                        struct lfsck_component *com,
4845                                        struct dt_object *obj)
4846 {
4847         struct lfsck_instance           *lfsck  = com->lc_lfsck;
4848         struct lfsck_layout             *lo     = com->lc_file_ram;
4849         const struct lu_fid             *fid    = lfsck_dto2fid(obj);
4850         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
4851         struct lfsck_layout_seq         *lls;
4852         __u64                            seq;
4853         __u64                            oid;
4854         int                              rc;
4855         ENTRY;
4856
4857         LASSERT(llsd != NULL);
4858
4859         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY5) &&
4860             cfs_fail_val == lfsck_dev_idx(lfsck->li_bottom)) {
4861                 struct l_wait_info       lwi = LWI_TIMEOUT(cfs_time_seconds(1),
4862                                                            NULL, NULL);
4863                 struct ptlrpc_thread    *thread = &lfsck->li_thread;
4864
4865                 l_wait_event(thread->t_ctl_waitq,
4866                              !thread_is_running(thread),
4867                              &lwi);
4868         }
4869
4870         lfsck_rbtree_update_bitmap(env, com, fid, false);
4871
4872         down_write(&com->lc_sem);
4873         if (fid_is_idif(fid))
4874                 seq = 0;
4875         else if (!fid_is_norm(fid) ||
4876                  !fid_is_for_ostobj(env, lfsck->li_next, obj, fid))
4877                 GOTO(unlock, rc = 0);
4878         else
4879                 seq = fid_seq(fid);
4880         com->lc_new_checked++;
4881
4882         lls = lfsck_layout_seq_lookup(llsd, seq);
4883         if (lls == NULL) {
4884                 OBD_ALLOC_PTR(lls);
4885                 if (unlikely(lls == NULL))
4886                         GOTO(unlock, rc = -ENOMEM);
4887
4888                 INIT_LIST_HEAD(&lls->lls_list);
4889                 lls->lls_seq = seq;
4890                 rc = lfsck_layout_lastid_load(env, com, lls);
4891                 if (rc != 0) {
4892                         CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4893                               "load LAST_ID for "LPX64": rc = %d\n",
4894                               lfsck_lfsck2name(com->lc_lfsck), seq, rc);
4895                         lo->ll_objs_failed_phase1++;
4896                         OBD_FREE_PTR(lls);
4897                         GOTO(unlock, rc);
4898                 }
4899
4900                 lfsck_layout_seq_insert(llsd, lls);
4901         }
4902
4903         if (unlikely(fid_is_last_id(fid)))
4904                 GOTO(unlock, rc = 0);
4905
4906         oid = fid_oid(fid);
4907         if (oid > lls->lls_lastid_known)
4908                 lls->lls_lastid_known = oid;
4909
4910         if (oid > lls->lls_lastid) {
4911                 if (!(lo->ll_flags & LF_CRASHED_LASTID)) {
4912                         /* OFD may create new objects during LFSCK scanning. */
4913                         rc = lfsck_layout_lastid_reload(env, com, lls);
4914                         if (unlikely(rc != 0))
4915                                 CDEBUG(D_LFSCK, "%s: layout LFSCK failed to "
4916                                       "reload LAST_ID for "LPX64": rc = %d\n",
4917                                       lfsck_lfsck2name(com->lc_lfsck),
4918                                       lls->lls_seq, rc);
4919                         if (oid <= lls->lls_lastid)
4920                                 GOTO(unlock, rc = 0);
4921
4922                         LASSERT(lfsck->li_out_notify != NULL);
4923
4924                         lfsck->li_out_notify(env, lfsck->li_out_notify_data,
4925                                              LE_LASTID_REBUILDING);
4926                         lo->ll_flags |= LF_CRASHED_LASTID;
4927                 }
4928
4929                 lls->lls_lastid = oid;
4930                 lls->lls_dirty = 1;
4931         }
4932
4933         GOTO(unlock, rc = 0);
4934
4935 unlock:
4936         up_write(&com->lc_sem);
4937
4938         return rc;
4939 }
4940
4941 static int lfsck_layout_exec_dir(const struct lu_env *env,
4942                                  struct lfsck_component *com,
4943                                  struct dt_object *obj,
4944                                  struct lu_dirent *ent)
4945 {
4946         return 0;
4947 }
4948
4949 static int lfsck_layout_master_post(const struct lu_env *env,
4950                                     struct lfsck_component *com,
4951                                     int result, bool init)
4952 {
4953         struct lfsck_instance           *lfsck   = com->lc_lfsck;
4954         struct lfsck_layout             *lo      = com->lc_file_ram;
4955         struct lfsck_layout_master_data *llmd    = com->lc_data;
4956         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
4957         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
4958         struct l_wait_info               lwi     = { 0 };
4959         int                              rc;
4960         ENTRY;
4961
4962
4963         llmd->llmd_post_result = result;
4964         llmd->llmd_to_post = 1;
4965         if (llmd->llmd_post_result <= 0)
4966                 llmd->llmd_exit = 1;
4967
4968         wake_up_all(&athread->t_ctl_waitq);
4969         l_wait_event(mthread->t_ctl_waitq,
4970                      (result > 0 && list_empty(&llmd->llmd_req_list)) ||
4971                      thread_is_stopped(athread),
4972                      &lwi);
4973
4974         if (llmd->llmd_assistant_status < 0)
4975                 result = llmd->llmd_assistant_status;
4976
4977         down_write(&com->lc_sem);
4978         spin_lock(&lfsck->li_lock);
4979         /* When LFSCK failed, there may be some prefetched objects those are
4980          * not been processed yet, we do not know the exactly position, then
4981          * just restart from last check-point next time. */
4982         if (!init && !llmd->llmd_exit)
4983                 lo->ll_pos_last_checkpoint =
4984                                         lfsck->li_pos_current.lp_oit_cookie;
4985
4986         if (result > 0) {
4987                 lo->ll_status = LS_SCANNING_PHASE2;
4988                 lo->ll_flags |= LF_SCANNED_ONCE;
4989                 lo->ll_flags &= ~LF_UPGRADE;
4990                 list_del_init(&com->lc_link);
4991                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
4992         } else if (result == 0) {
4993                 lo->ll_status = lfsck->li_status;
4994                 if (lo->ll_status == 0)
4995                         lo->ll_status = LS_STOPPED;
4996                 if (lo->ll_status != LS_PAUSED) {
4997                         list_del_init(&com->lc_link);
4998                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
4999                 }
5000         } else {
5001                 lo->ll_status = LS_FAILED;
5002                 list_del_init(&com->lc_link);
5003                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5004         }
5005         spin_unlock(&lfsck->li_lock);
5006
5007         if (!init) {
5008                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5009                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5010                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5011                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5012                 com->lc_new_checked = 0;
5013         }
5014
5015         rc = lfsck_layout_store(env, com);
5016         up_write(&com->lc_sem);
5017
5018         CDEBUG(D_LFSCK, "%s: layout LFSCK master post done: rc = %d\n",
5019                lfsck_lfsck2name(lfsck), rc);
5020
5021         RETURN(rc);
5022 }
5023
5024 static int lfsck_layout_slave_post(const struct lu_env *env,
5025                                    struct lfsck_component *com,
5026                                    int result, bool init)
5027 {
5028         struct lfsck_instance   *lfsck = com->lc_lfsck;
5029         struct lfsck_layout     *lo    = com->lc_file_ram;
5030         int                      rc;
5031         bool                     done  = false;
5032
5033         rc = lfsck_layout_lastid_store(env, com);
5034         if (rc != 0)
5035                 result = rc;
5036
5037         LASSERT(lfsck->li_out_notify != NULL);
5038
5039         down_write(&com->lc_sem);
5040         spin_lock(&lfsck->li_lock);
5041         if (!init)
5042                 lo->ll_pos_last_checkpoint =
5043                                         lfsck->li_pos_current.lp_oit_cookie;
5044         if (result > 0) {
5045                 lo->ll_status = LS_SCANNING_PHASE2;
5046                 lo->ll_flags |= LF_SCANNED_ONCE;
5047                 if (lo->ll_flags & LF_CRASHED_LASTID) {
5048                         done = true;
5049                         lo->ll_flags &= ~LF_CRASHED_LASTID;
5050                 }
5051                 lo->ll_flags &= ~LF_UPGRADE;
5052                 list_del_init(&com->lc_link);
5053                 list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
5054         } else if (result == 0) {
5055                 lo->ll_status = lfsck->li_status;
5056                 if (lo->ll_status == 0)
5057                         lo->ll_status = LS_STOPPED;
5058                 if (lo->ll_status != LS_PAUSED) {
5059                         list_del_init(&com->lc_link);
5060                         list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5061                 }
5062         } else {
5063                 lo->ll_status = LS_FAILED;
5064                 list_del_init(&com->lc_link);
5065                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5066         }
5067         spin_unlock(&lfsck->li_lock);
5068
5069         if (done)
5070                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5071                                      LE_LASTID_REBUILT);
5072
5073         if (!init) {
5074                 lo->ll_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
5075                                 HALF_SEC - lfsck->li_time_last_checkpoint);
5076                 lo->ll_time_last_checkpoint = cfs_time_current_sec();
5077                 lo->ll_objs_checked_phase1 += com->lc_new_checked;
5078                 com->lc_new_checked = 0;
5079         }
5080
5081         rc = lfsck_layout_store(env, com);
5082         up_write(&com->lc_sem);
5083
5084         lfsck_layout_slave_notify_master(env, com, LE_PHASE1_DONE, result);
5085
5086         if (result <= 0)
5087                 lfsck_rbtree_cleanup(env, com);
5088
5089         CDEBUG(D_LFSCK, "%s: layout LFSCK slave post done: rc = %d\n",
5090                lfsck_lfsck2name(lfsck), rc);
5091
5092         return rc;
5093 }
5094
5095 static int lfsck_layout_dump(const struct lu_env *env,
5096                              struct lfsck_component *com, struct seq_file *m)
5097 {
5098         struct lfsck_instance   *lfsck = com->lc_lfsck;
5099         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
5100         struct lfsck_layout     *lo    = com->lc_file_ram;
5101         int                      rc;
5102
5103         down_read(&com->lc_sem);
5104         seq_printf(m, "name: lfsck_layout\n"
5105                       "magic: %#x\n"
5106                       "version: %d\n"
5107                       "status: %s\n",
5108                       lo->ll_magic,
5109                       bk->lb_version,
5110                       lfsck_status2names(lo->ll_status));
5111
5112         rc = lfsck_bits_dump(m, lo->ll_flags, lfsck_flags_names, "flags");
5113         if (rc < 0)
5114                 goto out;
5115
5116         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
5117         if (rc < 0)
5118                 goto out;
5119
5120         rc = lfsck_time_dump(m, lo->ll_time_last_complete,
5121                              "time_since_last_completed");
5122         if (rc < 0)
5123                 goto out;
5124
5125         rc = lfsck_time_dump(m, lo->ll_time_latest_start,
5126                              "time_since_latest_start");
5127         if (rc < 0)
5128                 goto out;
5129
5130         rc = lfsck_time_dump(m, lo->ll_time_last_checkpoint,
5131                              "time_since_last_checkpoint");
5132         if (rc < 0)
5133                 goto out;
5134
5135         seq_printf(m, "latest_start_position: "LPU64"\n"
5136                       "last_checkpoint_position: "LPU64"\n"
5137                       "first_failure_position: "LPU64"\n",
5138                       lo->ll_pos_latest_start,
5139                       lo->ll_pos_last_checkpoint,
5140                       lo->ll_pos_first_inconsistent);
5141
5142         seq_printf(m, "success_count: %u\n"
5143                       "repaired_dangling: "LPU64"\n"
5144                       "repaired_unmatched_pair: "LPU64"\n"
5145                       "repaired_multiple_referenced: "LPU64"\n"
5146                       "repaired_orphan: "LPU64"\n"
5147                       "repaired_inconsistent_owner: "LPU64"\n"
5148                       "repaired_others: "LPU64"\n"
5149                       "skipped: "LPU64"\n"
5150                       "failed_phase1: "LPU64"\n"
5151                       "failed_phase2: "LPU64"\n",
5152                       lo->ll_success_count,
5153                       lo->ll_objs_repaired[LLIT_DANGLING - 1],
5154                       lo->ll_objs_repaired[LLIT_UNMATCHED_PAIR - 1],
5155                       lo->ll_objs_repaired[LLIT_MULTIPLE_REFERENCED - 1],
5156                       lo->ll_objs_repaired[LLIT_ORPHAN - 1],
5157                       lo->ll_objs_repaired[LLIT_INCONSISTENT_OWNER - 1],
5158                       lo->ll_objs_repaired[LLIT_OTHERS - 1],
5159                       lo->ll_objs_skipped,
5160                       lo->ll_objs_failed_phase1,
5161                       lo->ll_objs_failed_phase2);
5162
5163         if (lo->ll_status == LS_SCANNING_PHASE1) {
5164                 __u64 pos;
5165                 const struct dt_it_ops *iops;
5166                 cfs_duration_t duration = cfs_time_current() -
5167                                           lfsck->li_time_last_checkpoint;
5168                 __u64 checked = lo->ll_objs_checked_phase1 +
5169                                 com->lc_new_checked;
5170                 __u64 speed = checked;
5171                 __u64 new_checked = com->lc_new_checked * HZ;
5172                 __u32 rtime = lo->ll_run_time_phase1 +
5173                               cfs_duration_sec(duration + HALF_SEC);
5174
5175                 if (duration != 0)
5176                         do_div(new_checked, duration);
5177                 if (rtime != 0)
5178                         do_div(speed, rtime);
5179                 seq_printf(m, "checked_phase1: "LPU64"\n"
5180                               "checked_phase2: "LPU64"\n"
5181                               "run_time_phase1: %u seconds\n"
5182                               "run_time_phase2: %u seconds\n"
5183                               "average_speed_phase1: "LPU64" items/sec\n"
5184                               "average_speed_phase2: N/A\n"
5185                               "real-time_speed_phase1: "LPU64" items/sec\n"
5186                               "real-time_speed_phase2: N/A\n",
5187                               checked,
5188                               lo->ll_objs_checked_phase2,
5189                               rtime,
5190                               lo->ll_run_time_phase2,
5191                               speed,
5192                               new_checked);
5193
5194                 LASSERT(lfsck->li_di_oit != NULL);
5195
5196                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
5197
5198                 /* The low layer otable-based iteration position may NOT
5199                  * exactly match the layout-based directory traversal
5200                  * cookie. Generally, it is not a serious issue. But the
5201                  * caller should NOT make assumption on that. */
5202                 pos = iops->store(env, lfsck->li_di_oit);
5203                 if (!lfsck->li_current_oit_processed)
5204                         pos--;
5205                 seq_printf(m, "current_position: "LPU64"\n", pos);
5206
5207         } else if (lo->ll_status == LS_SCANNING_PHASE2) {
5208                 cfs_duration_t duration = cfs_time_current() -
5209                                           lfsck->li_time_last_checkpoint;
5210                 __u64 checked = lo->ll_objs_checked_phase2 +
5211                                 com->lc_new_checked;
5212                 __u64 speed1 = lo->ll_objs_checked_phase1;
5213                 __u64 speed2 = checked;
5214                 __u64 new_checked = com->lc_new_checked * HZ;
5215                 __u32 rtime = lo->ll_run_time_phase2 +
5216                               cfs_duration_sec(duration + HALF_SEC);
5217
5218                 if (duration != 0)
5219                         do_div(new_checked, duration);
5220                 if (lo->ll_run_time_phase1 != 0)
5221                         do_div(speed1, lo->ll_run_time_phase1);
5222                 if (rtime != 0)
5223                         do_div(speed2, rtime);
5224                 rc = seq_printf(m, "checked_phase1: "LPU64"\n"
5225                                 "checked_phase2: "LPU64"\n"
5226                                 "run_time_phase1: %u seconds\n"
5227                                 "run_time_phase2: %u seconds\n"
5228                                 "average_speed_phase1: "LPU64" items/sec\n"
5229                                 "average_speed_phase2: "LPU64" items/sec\n"
5230                                 "real-time_speed_phase1: N/A\n"
5231                                 "real-time_speed_phase2: "LPU64" items/sec\n"
5232                                 "current_position: "DFID"\n",
5233                                 lo->ll_objs_checked_phase1,
5234                                 checked,
5235                                 lo->ll_run_time_phase1,
5236                                 rtime,
5237                                 speed1,
5238                                 speed2,
5239                                 new_checked,
5240                                 PFID(&com->lc_fid_latest_scanned_phase2));
5241                 if (rc <= 0)
5242                         goto out;
5243
5244         } else {
5245                 __u64 speed1 = lo->ll_objs_checked_phase1;
5246                 __u64 speed2 = lo->ll_objs_checked_phase2;
5247
5248                 if (lo->ll_run_time_phase1 != 0)
5249                         do_div(speed1, lo->ll_run_time_phase1);
5250                 if (lo->ll_run_time_phase2 != 0)
5251                         do_div(speed2, lo->ll_run_time_phase2);
5252                 seq_printf(m, "checked_phase1: "LPU64"\n"
5253                            "checked_phase2: "LPU64"\n"
5254                            "run_time_phase1: %u seconds\n"
5255                            "run_time_phase2: %u seconds\n"
5256                            "average_speed_phase1: "LPU64" items/sec\n"
5257                            "average_speed_phase2: "LPU64" objs/sec\n"
5258                            "real-time_speed_phase1: N/A\n"
5259                            "real-time_speed_phase2: N/A\n"
5260                            "current_position: N/A\n",
5261                            lo->ll_objs_checked_phase1,
5262                            lo->ll_objs_checked_phase2,
5263                            lo->ll_run_time_phase1,
5264                            lo->ll_run_time_phase2,
5265                            speed1,
5266                            speed2);
5267         }
5268 out:
5269         up_read(&com->lc_sem);
5270
5271         return rc;
5272 }
5273
5274 static int lfsck_layout_master_double_scan(const struct lu_env *env,
5275                                            struct lfsck_component *com)
5276 {
5277         struct lfsck_layout_master_data *llmd    = com->lc_data;
5278         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5279         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5280         struct lfsck_layout             *lo      = com->lc_file_ram;
5281         struct l_wait_info               lwi     = { 0 };
5282
5283         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2))
5284                 return 0;
5285
5286         llmd->llmd_to_double_scan = 1;
5287         wake_up_all(&athread->t_ctl_waitq);
5288         l_wait_event(mthread->t_ctl_waitq,
5289                      llmd->llmd_in_double_scan ||
5290                      thread_is_stopped(athread),
5291                      &lwi);
5292         if (llmd->llmd_assistant_status < 0)
5293                 return llmd->llmd_assistant_status;
5294
5295         return 0;
5296 }
5297
5298 static int lfsck_layout_slave_double_scan(const struct lu_env *env,
5299                                           struct lfsck_component *com)
5300 {
5301         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5302         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
5303         struct lfsck_layout             *lo     = com->lc_file_ram;
5304         struct ptlrpc_thread            *thread = &lfsck->li_thread;
5305         int                              rc;
5306         ENTRY;
5307
5308         if (unlikely(lo->ll_status != LS_SCANNING_PHASE2)) {
5309                 lfsck_rbtree_cleanup(env, com);
5310                 lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, 0);
5311                 RETURN(0);
5312         }
5313
5314         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan start\n",
5315                lfsck_lfsck2name(lfsck));
5316
5317         atomic_inc(&lfsck->li_double_scan_count);
5318
5319         com->lc_new_checked = 0;
5320         com->lc_new_scanned = 0;
5321         com->lc_time_last_checkpoint = cfs_time_current();
5322         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
5323                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
5324
5325         while (1) {
5326                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(30),
5327                                                      NULL, NULL);
5328
5329                 rc = lfsck_layout_slave_query_master(env, com);
5330                 if (list_empty(&llsd->llsd_master_list)) {
5331                         if (unlikely(!thread_is_running(thread)))
5332                                 rc = 0;
5333                         else
5334                                 rc = 1;
5335
5336                         GOTO(done, rc);
5337                 }
5338
5339                 if (rc < 0)
5340                         GOTO(done, rc);
5341
5342                 rc = l_wait_event(thread->t_ctl_waitq,
5343                                   !thread_is_running(thread) ||
5344                                   list_empty(&llsd->llsd_master_list),
5345                                   &lwi);
5346                 if (unlikely(!thread_is_running(thread)))
5347                         GOTO(done, rc = 0);
5348
5349                 if (rc == -ETIMEDOUT)
5350                         continue;
5351
5352                 GOTO(done, rc = (rc < 0 ? rc : 1));
5353         }
5354
5355 done:
5356         rc = lfsck_layout_double_scan_result(env, com, rc);
5357
5358         lfsck_rbtree_cleanup(env, com);
5359         lfsck_layout_slave_notify_master(env, com, LE_PHASE2_DONE, rc);
5360         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
5361                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5362
5363         CDEBUG(D_LFSCK, "%s: layout LFSCK slave phase2 scan finished, "
5364                "status %d: rc = %d\n",
5365                lfsck_lfsck2name(lfsck), lo->ll_status, rc);
5366
5367         return rc;
5368 }
5369
5370 static void lfsck_layout_master_data_release(const struct lu_env *env,
5371                                              struct lfsck_component *com)
5372 {
5373         struct lfsck_layout_master_data *llmd   = com->lc_data;
5374         struct lfsck_instance           *lfsck  = com->lc_lfsck;
5375         struct lfsck_tgt_descs          *ltds;
5376         struct lfsck_tgt_desc           *ltd;
5377         struct lfsck_tgt_desc           *next;
5378
5379         LASSERT(llmd != NULL);
5380         LASSERT(thread_is_init(&llmd->llmd_thread) ||
5381                 thread_is_stopped(&llmd->llmd_thread));
5382         LASSERT(list_empty(&llmd->llmd_req_list));
5383
5384         com->lc_data = NULL;
5385
5386         ltds = &lfsck->li_ost_descs;
5387         spin_lock(&ltds->ltd_lock);
5388         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase1_list,
5389                                  ltd_layout_phase_list) {
5390                 list_del_init(&ltd->ltd_layout_phase_list);
5391         }
5392         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_phase2_list,
5393                                  ltd_layout_phase_list) {
5394                 list_del_init(&ltd->ltd_layout_phase_list);
5395         }
5396         list_for_each_entry_safe(ltd, next, &llmd->llmd_ost_list,
5397                                  ltd_layout_list) {
5398                 list_del_init(&ltd->ltd_layout_list);
5399         }
5400         spin_unlock(&ltds->ltd_lock);
5401
5402         ltds = &lfsck->li_mdt_descs;
5403         spin_lock(&ltds->ltd_lock);
5404         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase1_list,
5405                                  ltd_layout_phase_list) {
5406                 list_del_init(&ltd->ltd_layout_phase_list);
5407         }
5408         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_phase2_list,
5409                                  ltd_layout_phase_list) {
5410                 list_del_init(&ltd->ltd_layout_phase_list);
5411         }
5412         list_for_each_entry_safe(ltd, next, &llmd->llmd_mdt_list,
5413                                  ltd_layout_list) {
5414                 list_del_init(&ltd->ltd_layout_list);
5415         }
5416         spin_unlock(&ltds->ltd_lock);
5417
5418         OBD_FREE_PTR(llmd);
5419 }
5420
5421 static void lfsck_layout_slave_data_release(const struct lu_env *env,
5422                                             struct lfsck_component *com)
5423 {
5424         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5425         struct lfsck_layout_seq          *lls;
5426         struct lfsck_layout_seq          *next;
5427         struct lfsck_layout_slave_target *llst;
5428         struct lfsck_layout_slave_target *tmp;
5429
5430         LASSERT(llsd != NULL);
5431
5432         list_for_each_entry_safe(lls, next, &llsd->llsd_seq_list,
5433                                      lls_list) {
5434                 list_del_init(&lls->lls_list);
5435                 lfsck_object_put(env, lls->lls_lastid_obj);
5436                 OBD_FREE_PTR(lls);
5437         }
5438
5439         list_for_each_entry_safe(llst, tmp, &llsd->llsd_master_list,
5440                                  llst_list) {
5441                 list_del_init(&llst->llst_list);
5442                 OBD_FREE_PTR(llst);
5443         }
5444
5445         lfsck_rbtree_cleanup(env, com);
5446         com->lc_data = NULL;
5447         OBD_FREE_PTR(llsd);
5448 }
5449
5450 static void lfsck_layout_master_quit(const struct lu_env *env,
5451                                      struct lfsck_component *com)
5452 {
5453         struct lfsck_layout_master_data *llmd    = com->lc_data;
5454         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
5455         struct ptlrpc_thread            *athread = &llmd->llmd_thread;
5456         struct l_wait_info               lwi     = { 0 };
5457
5458         llmd->llmd_exit = 1;
5459         wake_up_all(&athread->t_ctl_waitq);
5460         l_wait_event(mthread->t_ctl_waitq,
5461                      thread_is_init(athread) ||
5462                      thread_is_stopped(athread),
5463                      &lwi);
5464 }
5465
5466 static void lfsck_layout_slave_quit(const struct lu_env *env,
5467                                     struct lfsck_component *com)
5468 {
5469         lfsck_rbtree_cleanup(env, com);
5470 }
5471
5472 static int lfsck_layout_master_in_notify(const struct lu_env *env,
5473                                          struct lfsck_component *com,
5474                                          struct lfsck_request *lr)
5475 {
5476         struct lfsck_instance           *lfsck = com->lc_lfsck;
5477         struct lfsck_layout             *lo    = com->lc_file_ram;
5478         struct lfsck_layout_master_data *llmd  = com->lc_data;
5479         struct lfsck_tgt_descs          *ltds;
5480         struct lfsck_tgt_desc           *ltd;
5481         bool                             fail  = false;
5482         ENTRY;
5483
5484         if (lr->lr_event == LE_PAIRS_VERIFY) {
5485                 int rc;
5486
5487                 rc = lfsck_layout_master_check_pairs(env, com, &lr->lr_fid,
5488                                                      &lr->lr_fid2);
5489
5490                 RETURN(rc);
5491         }
5492
5493         CDEBUG(D_LFSCK, "%s: layout LFSCK master handle notify %u "
5494                "from %s %x, status %d\n", lfsck_lfsck2name(lfsck),
5495                lr->lr_event, (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5496                lr->lr_index, lr->lr_status);
5497
5498         if (lr->lr_event != LE_PHASE1_DONE &&
5499             lr->lr_event != LE_PHASE2_DONE &&
5500             lr->lr_event != LE_PEER_EXIT)
5501                 RETURN(-EINVAL);
5502
5503         if (lr->lr_flags & LEF_FROM_OST)
5504                 ltds = &lfsck->li_ost_descs;
5505         else
5506                 ltds = &lfsck->li_mdt_descs;
5507         spin_lock(&ltds->ltd_lock);
5508         ltd = LTD_TGT(ltds, lr->lr_index);
5509         if (ltd == NULL) {
5510                 spin_unlock(&ltds->ltd_lock);
5511
5512                 RETURN(-ENXIO);
5513         }
5514
5515         list_del_init(&ltd->ltd_layout_phase_list);
5516         switch (lr->lr_event) {
5517         case LE_PHASE1_DONE:
5518                 if (lr->lr_status <= 0) {
5519                         ltd->ltd_layout_done = 1;
5520                         list_del_init(&ltd->ltd_layout_list);
5521                         lo->ll_flags |= LF_INCOMPLETE;
5522                         fail = true;
5523                         break;
5524                 }
5525
5526                 if (lr->lr_flags & LEF_FROM_OST) {
5527                         if (list_empty(&ltd->ltd_layout_list))
5528                                 list_add_tail(&ltd->ltd_layout_list,
5529                                               &llmd->llmd_ost_list);
5530                         list_add_tail(&ltd->ltd_layout_phase_list,
5531                                       &llmd->llmd_ost_phase2_list);
5532                 } else {
5533                         if (list_empty(&ltd->ltd_layout_list))
5534                                 list_add_tail(&ltd->ltd_layout_list,
5535                                               &llmd->llmd_mdt_list);
5536                         list_add_tail(&ltd->ltd_layout_phase_list,
5537                                       &llmd->llmd_mdt_phase2_list);
5538                 }
5539                 break;
5540         case LE_PHASE2_DONE:
5541                 ltd->ltd_layout_done = 1;
5542                 list_del_init(&ltd->ltd_layout_list);
5543                 break;
5544         case LE_PEER_EXIT:
5545                 fail = true;
5546                 ltd->ltd_layout_done = 1;
5547                 list_del_init(&ltd->ltd_layout_list);
5548                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT))
5549                         lo->ll_flags |= LF_INCOMPLETE;
5550                 break;
5551         default:
5552                 break;
5553         }
5554         spin_unlock(&ltds->ltd_lock);
5555
5556         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5557                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5558
5559                 memset(stop, 0, sizeof(*stop));
5560                 stop->ls_status = lr->lr_status;
5561                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5562                 lfsck_stop(env, lfsck->li_bottom, stop);
5563         } else if (lfsck_layout_master_to_orphan(llmd)) {
5564                 wake_up_all(&llmd->llmd_thread.t_ctl_waitq);
5565         }
5566
5567         RETURN(0);
5568 }
5569
5570 static int lfsck_layout_slave_in_notify(const struct lu_env *env,
5571                                         struct lfsck_component *com,
5572                                         struct lfsck_request *lr)
5573 {
5574         struct lfsck_instance            *lfsck = com->lc_lfsck;
5575         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5576         struct lfsck_layout_slave_target *llst;
5577         int                               rc;
5578         ENTRY;
5579
5580         switch (lr->lr_event) {
5581         case LE_FID_ACCESSED:
5582                 lfsck_rbtree_update_bitmap(env, com, &lr->lr_fid, true);
5583                 RETURN(0);
5584         case LE_CONDITIONAL_DESTROY:
5585                 rc = lfsck_layout_slave_conditional_destroy(env, com, lr);
5586                 RETURN(rc);
5587         case LE_PAIRS_VERIFY: {
5588                 lr->lr_status = LPVS_INIT;
5589                 /* Firstly, if the MDT-object which is claimed via OST-object
5590                  * local stored PFID xattr recognizes the OST-object, then it
5591                  * must be that the client given PFID is wrong. */
5592                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5593                                                     &lr->lr_fid3);
5594                 if (rc <= 0)
5595                         RETURN(0);
5596
5597                 lr->lr_status = LPVS_INCONSISTENT;
5598                 /* The OST-object local stored PFID xattr is stale. We need to
5599                  * check whether the MDT-object that is claimed via the client
5600                  * given PFID information recognizes the OST-object or not. If
5601                  * matches, then need to update the OST-object's PFID xattr. */
5602                 rc = lfsck_layout_slave_check_pairs(env, com, &lr->lr_fid,
5603                                                     &lr->lr_fid2);
5604                 /* For rc < 0 case:
5605                  * We are not sure whether the client given PFID information
5606                  * is correct or not, do nothing to avoid improper fixing.
5607                  *
5608                  * For rc > 0 case:
5609                  * The client given PFID information is also invalid, we can
5610                  * NOT fix the OST-object inconsistency.
5611                  */
5612                 if (rc != 0)
5613                         RETURN(rc);
5614
5615                 lr->lr_status = LPVS_INCONSISTENT_TOFIX;
5616                 rc = lfsck_layout_slave_repair_pfid(env, com, lr);
5617
5618                 RETURN(rc);
5619         }
5620         case LE_PHASE2_DONE:
5621         case LE_PEER_EXIT:
5622                 CDEBUG(D_LFSCK, "%s: layout LFSCK slave handle notify %u "
5623                        "from MDT %x, status %d\n", lfsck_lfsck2name(lfsck),
5624                        lr->lr_event, lr->lr_index, lr->lr_status);
5625                 break;
5626         default:
5627                 RETURN(-EINVAL);
5628         }
5629
5630         llst = lfsck_layout_llst_find_and_del(llsd, lr->lr_index, true);
5631         if (llst == NULL)
5632                 RETURN(-ENXIO);
5633
5634         lfsck_layout_llst_put(llst);
5635         if (list_empty(&llsd->llsd_master_list))
5636                 wake_up_all(&lfsck->li_thread.t_ctl_waitq);
5637
5638         if (lr->lr_event == LE_PEER_EXIT &&
5639             lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
5640                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
5641
5642                 memset(stop, 0, sizeof(*stop));
5643                 stop->ls_status = lr->lr_status;
5644                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
5645                 lfsck_stop(env, lfsck->li_bottom, stop);
5646         }
5647
5648         RETURN(0);
5649 }
5650
5651 static int lfsck_layout_query(const struct lu_env *env,
5652                               struct lfsck_component *com)
5653 {
5654         struct lfsck_layout *lo = com->lc_file_ram;
5655
5656         return lo->ll_status;
5657 }
5658
5659 static int lfsck_layout_master_stop_notify(const struct lu_env *env,
5660                                            struct lfsck_component *com,
5661                                            struct lfsck_tgt_descs *ltds,
5662                                            struct lfsck_tgt_desc *ltd,
5663                                            struct ptlrpc_request_set *set)
5664 {
5665         struct lfsck_thread_info          *info  = lfsck_env_info(env);
5666         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
5667         struct lfsck_request              *lr    = &info->lti_lr;
5668         struct lfsck_instance             *lfsck = com->lc_lfsck;
5669         int                                rc;
5670
5671         spin_lock(&ltds->ltd_lock);
5672         if (list_empty(&ltd->ltd_layout_list)) {
5673                 LASSERT(list_empty(&ltd->ltd_layout_phase_list));
5674                 spin_unlock(&ltds->ltd_lock);
5675
5676                 return 0;
5677         }
5678
5679         list_del_init(&ltd->ltd_layout_phase_list);
5680         list_del_init(&ltd->ltd_layout_list);
5681         spin_unlock(&ltds->ltd_lock);
5682
5683         memset(lr, 0, sizeof(*lr));
5684         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
5685         lr->lr_event = LE_PEER_EXIT;
5686         lr->lr_active = LFSCK_TYPE_LAYOUT;
5687         lr->lr_status = LS_CO_PAUSED;
5688         if (ltds == &lfsck->li_ost_descs)
5689                 lr->lr_flags = LEF_TO_OST;
5690
5691         laia->laia_com = com;
5692         laia->laia_ltds = ltds;
5693         atomic_inc(&ltd->ltd_ref);
5694         laia->laia_ltd = ltd;
5695         laia->laia_lr = lr;
5696         laia->laia_shared = 0;
5697
5698         rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
5699                                  lfsck_layout_master_async_interpret,
5700                                  laia, LFSCK_NOTIFY);
5701         if (rc != 0) {
5702                 CDEBUG(D_LFSCK, "%s: layout LFSCK fail to notify %s %x "
5703                        "for co-stop: rc = %d\n",
5704                        lfsck_lfsck2name(lfsck),
5705                        (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
5706                        ltd->ltd_index, rc);
5707                 lfsck_tgt_put(ltd);
5708         }
5709
5710         return rc;
5711 }
5712
5713 /* with lfsck::li_lock held */
5714 static int lfsck_layout_slave_join(const struct lu_env *env,
5715                                    struct lfsck_component *com,
5716                                    struct lfsck_start_param *lsp)
5717 {
5718         struct lfsck_instance            *lfsck = com->lc_lfsck;
5719         struct lfsck_layout_slave_data   *llsd  = com->lc_data;
5720         struct lfsck_layout_slave_target *llst;
5721         struct lfsck_start               *start = lsp->lsp_start;
5722         int                               rc    = 0;
5723         ENTRY;
5724
5725         if (start == NULL || !(start->ls_flags & LPF_ORPHAN))
5726                 RETURN(0);
5727
5728         if (!lsp->lsp_index_valid)
5729                 RETURN(-EINVAL);
5730
5731         /* If someone is running the LFSCK without orphan handling,
5732          * it will not maintain the object accessing rbtree. So we
5733          * cannot join it for orphan handling. */
5734         if (!llsd->llsd_rbtree_valid)
5735                 RETURN(-EBUSY);
5736
5737         spin_unlock(&lfsck->li_lock);
5738         rc = lfsck_layout_llst_add(llsd, lsp->lsp_index);
5739         spin_lock(&lfsck->li_lock);
5740         if (rc == 0 && !thread_is_running(&lfsck->li_thread)) {
5741                 spin_unlock(&lfsck->li_lock);
5742                 llst = lfsck_layout_llst_find_and_del(llsd, lsp->lsp_index,
5743                                                       true);
5744                 if (llst != NULL)
5745                         lfsck_layout_llst_put(llst);
5746                 spin_lock(&lfsck->li_lock);
5747                 rc = -EAGAIN;
5748         }
5749
5750         RETURN(rc);
5751 }
5752
5753 static struct lfsck_operations lfsck_layout_master_ops = {
5754         .lfsck_reset            = lfsck_layout_reset,
5755         .lfsck_fail             = lfsck_layout_fail,
5756         .lfsck_checkpoint       = lfsck_layout_master_checkpoint,
5757         .lfsck_prep             = lfsck_layout_master_prep,
5758         .lfsck_exec_oit         = lfsck_layout_master_exec_oit,
5759         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5760         .lfsck_post             = lfsck_layout_master_post,
5761         .lfsck_interpret        = lfsck_layout_master_async_interpret,
5762         .lfsck_dump             = lfsck_layout_dump,
5763         .lfsck_double_scan      = lfsck_layout_master_double_scan,
5764         .lfsck_data_release     = lfsck_layout_master_data_release,
5765         .lfsck_quit             = lfsck_layout_master_quit,
5766         .lfsck_in_notify        = lfsck_layout_master_in_notify,
5767         .lfsck_query            = lfsck_layout_query,
5768         .lfsck_stop_notify      = lfsck_layout_master_stop_notify,
5769 };
5770
5771 static struct lfsck_operations lfsck_layout_slave_ops = {
5772         .lfsck_reset            = lfsck_layout_reset,
5773         .lfsck_fail             = lfsck_layout_fail,
5774         .lfsck_checkpoint       = lfsck_layout_slave_checkpoint,
5775         .lfsck_prep             = lfsck_layout_slave_prep,
5776         .lfsck_exec_oit         = lfsck_layout_slave_exec_oit,
5777         .lfsck_exec_dir         = lfsck_layout_exec_dir,
5778         .lfsck_post             = lfsck_layout_slave_post,
5779         .lfsck_dump             = lfsck_layout_dump,
5780         .lfsck_double_scan      = lfsck_layout_slave_double_scan,
5781         .lfsck_data_release     = lfsck_layout_slave_data_release,
5782         .lfsck_quit             = lfsck_layout_slave_quit,
5783         .lfsck_in_notify        = lfsck_layout_slave_in_notify,
5784         .lfsck_query            = lfsck_layout_query,
5785         .lfsck_join             = lfsck_layout_slave_join,
5786 };
5787
5788 int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
5789 {
5790         struct lfsck_component  *com;
5791         struct lfsck_layout     *lo;
5792         struct dt_object        *root = NULL;
5793         struct dt_object        *obj;
5794         int                      rc;
5795         ENTRY;
5796
5797         OBD_ALLOC_PTR(com);
5798         if (com == NULL)
5799                 RETURN(-ENOMEM);
5800
5801         INIT_LIST_HEAD(&com->lc_link);
5802         INIT_LIST_HEAD(&com->lc_link_dir);
5803         init_rwsem(&com->lc_sem);
5804         atomic_set(&com->lc_ref, 1);
5805         com->lc_lfsck = lfsck;
5806         com->lc_type = LFSCK_TYPE_LAYOUT;
5807         if (lfsck->li_master) {
5808                 struct lfsck_layout_master_data *llmd;
5809
5810                 com->lc_ops = &lfsck_layout_master_ops;
5811                 OBD_ALLOC_PTR(llmd);
5812                 if (llmd == NULL)
5813                         GOTO(out, rc = -ENOMEM);
5814
5815                 INIT_LIST_HEAD(&llmd->llmd_req_list);
5816                 spin_lock_init(&llmd->llmd_lock);
5817                 INIT_LIST_HEAD(&llmd->llmd_ost_list);
5818                 INIT_LIST_HEAD(&llmd->llmd_ost_phase1_list);
5819                 INIT_LIST_HEAD(&llmd->llmd_ost_phase2_list);
5820                 INIT_LIST_HEAD(&llmd->llmd_mdt_list);
5821                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
5822                 INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
5823                 init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
5824                 com->lc_data = llmd;
5825         } else {
5826                 struct lfsck_layout_slave_data *llsd;
5827
5828                 com->lc_ops = &lfsck_layout_slave_ops;
5829                 OBD_ALLOC_PTR(llsd);
5830                 if (llsd == NULL)
5831                         GOTO(out, rc = -ENOMEM);
5832
5833                 INIT_LIST_HEAD(&llsd->llsd_seq_list);
5834                 INIT_LIST_HEAD(&llsd->llsd_master_list);
5835                 spin_lock_init(&llsd->llsd_lock);
5836                 llsd->llsd_rb_root = RB_ROOT;
5837                 rwlock_init(&llsd->llsd_rb_lock);
5838                 com->lc_data = llsd;
5839         }
5840         com->lc_file_size = sizeof(*lo);
5841         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
5842         if (com->lc_file_ram == NULL)
5843                 GOTO(out, rc = -ENOMEM);
5844
5845         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
5846         if (com->lc_file_disk == NULL)
5847                 GOTO(out, rc = -ENOMEM);
5848
5849         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
5850         if (IS_ERR(root))
5851                 GOTO(out, rc = PTR_ERR(root));
5852
5853         if (unlikely(!dt_try_as_dir(env, root)))
5854                 GOTO(out, rc = -ENOTDIR);
5855
5856         obj = local_file_find_or_create(env, lfsck->li_los, root,
5857                                         lfsck_layout_name,
5858                                         S_IFREG | S_IRUGO | S_IWUSR);
5859         if (IS_ERR(obj))
5860                 GOTO(out, rc = PTR_ERR(obj));
5861
5862         com->lc_obj = obj;
5863         rc = lfsck_layout_load(env, com);
5864         if (rc > 0)
5865                 rc = lfsck_layout_reset(env, com, true);
5866         else if (rc == -ENOENT)
5867                 rc = lfsck_layout_init(env, com);
5868
5869         if (rc != 0)
5870                 GOTO(out, rc);
5871
5872         lo = com->lc_file_ram;
5873         switch (lo->ll_status) {
5874         case LS_INIT:
5875         case LS_COMPLETED:
5876         case LS_FAILED:
5877         case LS_STOPPED:
5878         case LS_PARTIAL:
5879                 spin_lock(&lfsck->li_lock);
5880                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
5881                 spin_unlock(&lfsck->li_lock);
5882                 break;
5883         default:
5884                 CERROR("%s: unknown lfsck_layout status %d\n",
5885                        lfsck_lfsck2name(lfsck), lo->ll_status);
5886                 /* fall through */
5887         case LS_SCANNING_PHASE1:
5888         case LS_SCANNING_PHASE2:
5889                 /* No need to store the status to disk right now.
5890                  * If the system crashed before the status stored,
5891                  * it will be loaded back when next time. */
5892                 lo->ll_status = LS_CRASHED;
5893                 lo->ll_flags |= LF_INCOMPLETE;
5894                 /* fall through */
5895         case LS_PAUSED:
5896         case LS_CRASHED:
5897         case LS_CO_FAILED:
5898         case LS_CO_STOPPED:
5899         case LS_CO_PAUSED:
5900                 spin_lock(&lfsck->li_lock);
5901                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
5902                 spin_unlock(&lfsck->li_lock);
5903                 break;
5904         }
5905
5906         if (lo->ll_flags & LF_CRASHED_LASTID) {
5907                 LASSERT(lfsck->li_out_notify != NULL);
5908
5909                 lfsck->li_out_notify(env, lfsck->li_out_notify_data,
5910                                      LE_LASTID_REBUILDING);
5911         }
5912
5913         GOTO(out, rc = 0);
5914
5915 out:
5916         if (root != NULL && !IS_ERR(root))
5917                 lu_object_put(env, &root->do_lu);
5918
5919         if (rc != 0) {
5920                 lfsck_component_cleanup(env, com);
5921                 CERROR("%s: fail to init layout LFSCK component: rc = %d\n",
5922                        lfsck_lfsck2name(lfsck), rc);
5923         }
5924
5925         return rc;
5926 }
5927
5928 struct lfsck_orphan_it {
5929         struct lfsck_component           *loi_com;
5930         struct lfsck_rbtree_node         *loi_lrn;
5931         struct lfsck_layout_slave_target *loi_llst;
5932         struct lu_fid                     loi_key;
5933         struct lu_orphan_rec              loi_rec;
5934         __u64                             loi_hash;
5935         unsigned int                      loi_over:1;
5936 };
5937
5938 static int lfsck_fid_match_idx(const struct lu_env *env,
5939                                struct lfsck_instance *lfsck,
5940                                const struct lu_fid *fid, int idx)
5941 {
5942         struct seq_server_site  *ss;
5943         struct lu_server_fld    *sf;
5944         struct lu_seq_range      range  = { 0 };
5945         int                      rc;
5946
5947         /* All abnormal cases will be returned to MDT0. */
5948         if (!fid_is_norm(fid)) {
5949                 if (idx == 0)
5950                         return 1;
5951
5952                 return 0;
5953         }
5954
5955         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
5956         if (unlikely(ss == NULL))
5957                 return -ENOTCONN;
5958
5959         sf = ss->ss_server_fld;
5960         LASSERT(sf != NULL);
5961
5962         fld_range_set_any(&range);
5963         rc = fld_server_lookup(env, sf, fid_seq(fid), &range);
5964         if (rc != 0)
5965                 return rc;
5966
5967         if (!fld_range_is_mdt(&range))
5968                 return -EINVAL;
5969
5970         if (range.lsr_index == idx)
5971                 return 1;
5972
5973         return 0;
5974 }
5975
5976 static void lfsck_layout_destroy_orphan(const struct lu_env *env,
5977                                         struct dt_device *dev,
5978                                         struct dt_object *obj)
5979 {
5980         struct thandle *handle;
5981         int             rc;
5982         ENTRY;
5983
5984         handle = dt_trans_create(env, dev);
5985         if (IS_ERR(handle))
5986                 RETURN_EXIT;
5987
5988         rc = dt_declare_ref_del(env, obj, handle);
5989         if (rc != 0)
5990                 GOTO(stop, rc);
5991
5992         rc = dt_declare_destroy(env, obj, handle);
5993         if (rc != 0)
5994                 GOTO(stop, rc);
5995
5996         rc = dt_trans_start_local(env, dev, handle);
5997         if (rc != 0)
5998                 GOTO(stop, rc);
5999
6000         dt_write_lock(env, obj, 0);
6001         rc = dt_ref_del(env, obj, handle);
6002         if (rc == 0)
6003                 rc = dt_destroy(env, obj, handle);
6004         dt_write_unlock(env, obj);
6005
6006         GOTO(stop, rc);
6007
6008 stop:
6009         dt_trans_stop(env, dev, handle);
6010
6011         CDEBUG(D_LFSCK, "destroy orphan OST-object "DFID": rc = %d\n",
6012                PFID(lfsck_dto2fid(obj)), rc);
6013
6014         RETURN_EXIT;
6015 }
6016
6017 static int lfsck_orphan_index_lookup(const struct lu_env *env,
6018                                      struct dt_object *dt,
6019                                      struct dt_rec *rec,
6020                                      const struct dt_key *key,
6021                                      struct lustre_capa *capa)
6022 {
6023         return -EOPNOTSUPP;
6024 }
6025
6026 static int lfsck_orphan_index_declare_insert(const struct lu_env *env,
6027                                              struct dt_object *dt,
6028                                              const struct dt_rec *rec,
6029                                              const struct dt_key *key,
6030                                              struct thandle *handle)
6031 {
6032         return -EOPNOTSUPP;
6033 }
6034
6035 static int lfsck_orphan_index_insert(const struct lu_env *env,
6036                                      struct dt_object *dt,
6037                                      const struct dt_rec *rec,
6038                                      const struct dt_key *key,
6039                                      struct thandle *handle,
6040                                      struct lustre_capa *capa,
6041                                      int ignore_quota)
6042 {
6043         return -EOPNOTSUPP;
6044 }
6045
6046 static int lfsck_orphan_index_declare_delete(const struct lu_env *env,
6047                                              struct dt_object *dt,
6048                                              const struct dt_key *key,
6049                                              struct thandle *handle)
6050 {
6051         return -EOPNOTSUPP;
6052 }
6053
6054 static int lfsck_orphan_index_delete(const struct lu_env *env,
6055                                      struct dt_object *dt,
6056                                      const struct dt_key *key,
6057                                      struct thandle *handle,
6058                                      struct lustre_capa *capa)
6059 {
6060         return -EOPNOTSUPP;
6061 }
6062
6063 static struct dt_it *lfsck_orphan_it_init(const struct lu_env *env,
6064                                           struct dt_object *dt,
6065                                           __u32 attr,
6066                                           struct lustre_capa *capa)
6067 {
6068         struct dt_device                *dev    = lu2dt_dev(dt->do_lu.lo_dev);
6069         struct lfsck_instance           *lfsck;
6070         struct lfsck_component          *com    = NULL;
6071         struct lfsck_layout_slave_data  *llsd;
6072         struct lfsck_orphan_it          *it     = NULL;
6073         int                              rc     = 0;
6074         ENTRY;
6075
6076         lfsck = lfsck_instance_find(dev, true, false);
6077         if (unlikely(lfsck == NULL))
6078                 RETURN(ERR_PTR(-ENXIO));
6079
6080         com = lfsck_component_find(lfsck, LFSCK_TYPE_LAYOUT);
6081         if (unlikely(com == NULL))
6082                 GOTO(out, rc = -ENOENT);
6083
6084         llsd = com->lc_data;
6085         if (!llsd->llsd_rbtree_valid)
6086                 GOTO(out, rc = -ESRCH);
6087
6088         OBD_ALLOC_PTR(it);
6089         if (it == NULL)
6090                 GOTO(out, rc = -ENOMEM);
6091
6092         it->loi_llst = lfsck_layout_llst_find_and_del(llsd, attr, false);
6093         if (it->loi_llst == NULL)
6094                 GOTO(out, rc = -ENXIO);
6095
6096         if (dev->dd_record_fid_accessed) {
6097                 /* The first iteration against the rbtree, scan the whole rbtree
6098                  * to remove the nodes which do NOT need to be handled. */
6099                 write_lock(&llsd->llsd_rb_lock);
6100                 if (dev->dd_record_fid_accessed) {
6101                         struct rb_node                  *node;
6102                         struct rb_node                  *next;
6103                         struct lfsck_rbtree_node        *lrn;
6104
6105                         /* No need to record the fid accessing anymore. */
6106                         dev->dd_record_fid_accessed = 0;
6107
6108                         node = rb_first(&llsd->llsd_rb_root);
6109                         while (node != NULL) {
6110                                 next = rb_next(node);
6111                                 lrn = rb_entry(node, struct lfsck_rbtree_node,
6112                                                lrn_node);
6113                                 if (atomic_read(&lrn->lrn_known_count) <=
6114                                     atomic_read(&lrn->lrn_accessed_count)) {
6115                                         rb_erase(node, &llsd->llsd_rb_root);
6116                                         lfsck_rbtree_free(lrn);
6117                                 }
6118                                 node = next;
6119                         }
6120                 }
6121                 write_unlock(&llsd->llsd_rb_lock);
6122         }
6123
6124         /* read lock the rbtree when init, and unlock when fini */
6125         read_lock(&llsd->llsd_rb_lock);
6126         it->loi_com = com;
6127         com = NULL;
6128
6129         GOTO(out, rc = 0);
6130
6131 out:
6132         if (com != NULL)
6133                 lfsck_component_put(env, com);
6134
6135         CDEBUG(D_LFSCK, "%s: init the orphan iteration: rc = %d\n",
6136                lfsck_lfsck2name(lfsck), rc);
6137
6138         lfsck_instance_put(env, lfsck);
6139         if (rc != 0) {
6140                 if (it != NULL)
6141                         OBD_FREE_PTR(it);
6142
6143                 it = (struct lfsck_orphan_it *)ERR_PTR(rc);
6144         }
6145
6146         return (struct dt_it *)it;
6147 }
6148
6149 static void lfsck_orphan_it_fini(const struct lu_env *env,
6150                                  struct dt_it *di)
6151 {
6152         struct lfsck_orphan_it           *it    = (struct lfsck_orphan_it *)di;
6153         struct lfsck_component           *com   = it->loi_com;
6154         struct lfsck_layout_slave_data   *llsd;
6155         struct lfsck_layout_slave_target *llst;
6156
6157         if (com != NULL) {
6158                 CDEBUG(D_LFSCK, "%s: fini the orphan iteration\n",
6159                        lfsck_lfsck2name(com->lc_lfsck));
6160
6161                 llsd = com->lc_data;
6162                 read_unlock(&llsd->llsd_rb_lock);
6163                 llst = it->loi_llst;
6164                 LASSERT(llst != NULL);
6165
6166                 /* Save the key and hash for iterate next. */
6167                 llst->llst_fid = it->loi_key;
6168                 llst->llst_hash = it->loi_hash;
6169                 lfsck_layout_llst_put(llst);
6170                 lfsck_component_put(env, com);
6171         }
6172         OBD_FREE_PTR(it);
6173 }
6174
6175 /**
6176  * \retval       +1: the iteration finished
6177  * \retval        0: on success, not finished
6178  * \retval      -ve: on error
6179  */
6180 static int lfsck_orphan_it_next(const struct lu_env *env,
6181                                 struct dt_it *di)
6182 {
6183         struct lfsck_thread_info        *info   = lfsck_env_info(env);
6184         struct filter_fid_old           *pfid   = &info->lti_old_pfid;
6185         struct lu_attr                  *la     = &info->lti_la;
6186         struct lfsck_orphan_it          *it     = (struct lfsck_orphan_it *)di;
6187         struct lu_fid                   *key    = &it->loi_key;
6188         struct lu_orphan_rec            *rec    = &it->loi_rec;
6189         struct lfsck_component          *com    = it->loi_com;
6190         struct lfsck_instance           *lfsck  = com->lc_lfsck;
6191         struct lfsck_layout_slave_data  *llsd   = com->lc_data;
6192         struct dt_object                *obj;
6193         struct lfsck_rbtree_node        *lrn;
6194         int                              pos;
6195         int                              rc;
6196         __u32                            save;
6197         __u32                            idx    = it->loi_llst->llst_index;
6198         bool                             exact  = false;
6199         ENTRY;
6200
6201         if (it->loi_over)
6202                 RETURN(1);
6203
6204 again0:
6205         lrn = it->loi_lrn;
6206         if (lrn == NULL) {
6207                 lrn = lfsck_rbtree_search(llsd, key, &exact);
6208                 if (lrn == NULL) {
6209                         it->loi_over = 1;
6210                         RETURN(1);
6211                 }
6212
6213                 it->loi_lrn = lrn;
6214                 if (!exact) {
6215                         key->f_seq = lrn->lrn_seq;
6216                         key->f_oid = lrn->lrn_first_oid;
6217                         key->f_ver = 0;
6218                 }
6219         } else {
6220                 key->f_oid++;
6221                 if (unlikely(key->f_oid == 0)) {
6222                         key->f_seq++;
6223                         it->loi_lrn = NULL;
6224                         goto again0;
6225                 }
6226
6227                 if (key->f_oid >=
6228                     lrn->lrn_first_oid + LFSCK_RBTREE_BITMAP_WIDTH) {
6229                         it->loi_lrn = NULL;
6230                         goto again0;
6231                 }
6232         }
6233
6234         if (unlikely(atomic_read(&lrn->lrn_known_count) <=
6235                      atomic_read(&lrn->lrn_accessed_count))) {
6236                 struct rb_node *next = rb_next(&lrn->lrn_node);
6237
6238                 while (next != NULL) {
6239                         lrn = rb_entry(next, struct lfsck_rbtree_node,
6240                                        lrn_node);
6241                         if (atomic_read(&lrn->lrn_known_count) >
6242                             atomic_read(&lrn->lrn_accessed_count))
6243                                 break;
6244                         next = rb_next(next);
6245                 }
6246
6247                 if (next == NULL) {
6248                         it->loi_over = 1;
6249                         RETURN(1);
6250                 }
6251
6252                 it->loi_lrn = lrn;
6253                 key->f_seq = lrn->lrn_seq;
6254                 key->f_oid = lrn->lrn_first_oid;
6255                 key->f_ver = 0;
6256         }
6257
6258         pos = key->f_oid - lrn->lrn_first_oid;
6259
6260 again1:
6261         pos = find_next_bit(lrn->lrn_known_bitmap,
6262                             LFSCK_RBTREE_BITMAP_WIDTH, pos);
6263         if (pos >= LFSCK_RBTREE_BITMAP_WIDTH) {
6264                 key->f_oid = lrn->lrn_first_oid + pos;
6265                 if (unlikely(key->f_oid < lrn->lrn_first_oid)) {
6266                         key->f_seq++;
6267                         key->f_oid = 0;
6268                 }
6269                 it->loi_lrn = NULL;
6270                 goto again0;
6271         }
6272
6273         if (test_bit(pos, lrn->lrn_accessed_bitmap)) {
6274                 pos++;
6275                 goto again1;
6276         }
6277
6278         key->f_oid = lrn->lrn_first_oid + pos;
6279         obj = lfsck_object_find(env, lfsck, key);
6280         if (IS_ERR(obj)) {
6281                 rc = PTR_ERR(obj);
6282                 if (rc == -ENOENT) {
6283                         pos++;
6284                         goto again1;
6285                 }
6286                 RETURN(rc);
6287         }
6288
6289         dt_read_lock(env, obj, 0);
6290         if (!dt_object_exists(obj)) {
6291                 dt_read_unlock(env, obj);
6292                 lfsck_object_put(env, obj);
6293                 pos++;
6294                 goto again1;
6295         }
6296
6297         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
6298         if (rc != 0)
6299                 GOTO(out, rc);
6300
6301         rc = dt_xattr_get(env, obj, lfsck_buf_get(env, pfid, sizeof(*pfid)),
6302                           XATTR_NAME_FID, BYPASS_CAPA);
6303         if (rc == -ENODATA) {
6304                 /* For the pre-created OST-object, update the bitmap to avoid
6305                  * others LFSCK (second phase) iteration to touch it again. */
6306                 if (la->la_ctime == 0) {
6307                         if (!test_and_set_bit(pos, lrn->lrn_accessed_bitmap))
6308                                 atomic_inc(&lrn->lrn_accessed_count);
6309
6310                         /* For the race between repairing dangling referenced
6311                          * MDT-object and unlink the file, it may left orphan
6312                          * OST-object there. Destroy it now! */
6313                         if (unlikely(!(la->la_mode & S_ISUID))) {
6314                                 dt_read_unlock(env, obj);
6315                                 lfsck_layout_destroy_orphan(env,
6316                                                             lfsck->li_bottom,
6317                                                             obj);
6318                                 lfsck_object_put(env, obj);
6319                                 pos++;
6320                                 goto again1;
6321                         }
6322                 } else if (idx == 0) {
6323                         /* If the orphan OST-object has no parent information,
6324                          * regard it as referenced by the MDT-object on MDT0. */
6325                         fid_zero(&rec->lor_fid);
6326                         rec->lor_uid = la->la_uid;
6327                         rec->lor_gid = la->la_gid;
6328                         GOTO(out, rc = 0);
6329                 }
6330
6331                 dt_read_unlock(env, obj);
6332                 lfsck_object_put(env, obj);
6333                 pos++;
6334                 goto again1;
6335         }
6336
6337         if (rc < 0)
6338                 GOTO(out, rc);
6339
6340         if (rc != sizeof(struct filter_fid) &&
6341             rc != sizeof(struct filter_fid_old))
6342                 GOTO(out, rc = -EINVAL);
6343
6344         fid_le_to_cpu(&rec->lor_fid, &pfid->ff_parent);
6345         /* Currently, the filter_fid::ff_parent::f_ver is not the real parent
6346          * MDT-object's FID::f_ver, instead it is the OST-object index in its
6347          * parent MDT-object's layout EA. */
6348         save = rec->lor_fid.f_stripe_idx;
6349         rec->lor_fid.f_ver = 0;
6350         rc = lfsck_fid_match_idx(env, lfsck, &rec->lor_fid, idx);
6351         /* If the orphan OST-object does not claim the MDT, then next.
6352          *
6353          * If we do not know whether it matches or not, then return it
6354          * to the MDT for further check. */
6355         if (rc == 0) {
6356                 dt_read_unlock(env, obj);
6357                 lfsck_object_put(env, obj);
6358                 pos++;
6359                 goto again1;
6360         }
6361
6362         rec->lor_fid.f_stripe_idx = save;
6363         rec->lor_uid = la->la_uid;
6364         rec->lor_gid = la->la_gid;
6365
6366         CDEBUG(D_LFSCK, "%s: return orphan "DFID", PFID "DFID", owner %u:%u\n",
6367                lfsck_lfsck2name(com->lc_lfsck), PFID(key), PFID(&rec->lor_fid),
6368                rec->lor_uid, rec->lor_gid);
6369
6370         GOTO(out, rc = 0);
6371
6372 out:
6373         dt_read_unlock(env, obj);
6374         lfsck_object_put(env, obj);
6375         if (rc == 0)
6376                 it->loi_hash++;
6377
6378         return rc;
6379 }
6380
6381 /**
6382  * \retval       +1: locate to the exactly position
6383  * \retval        0: cannot locate to the exactly position,
6384  *                   call next() to move to a valid position.
6385  * \retval      -ve: on error
6386  */
6387 static int lfsck_orphan_it_get(const struct lu_env *env,
6388                                struct dt_it *di,
6389                                const struct dt_key *key)
6390 {
6391         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6392         int                      rc;
6393
6394         it->loi_key = *(struct lu_fid *)key;
6395         rc = lfsck_orphan_it_next(env, di);
6396         if (rc == 1)
6397                 return 0;
6398
6399         if (rc == 0)
6400                 return 1;
6401
6402         return rc;
6403 }
6404
6405 static void lfsck_orphan_it_put(const struct lu_env *env,
6406                                 struct dt_it *di)
6407 {
6408 }
6409
6410 static struct dt_key *lfsck_orphan_it_key(const struct lu_env *env,
6411                                           const struct dt_it *di)
6412 {
6413         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6414
6415         return (struct dt_key *)&it->loi_key;
6416 }
6417
6418 static int lfsck_orphan_it_key_size(const struct lu_env *env,
6419                                     const struct dt_it *di)
6420 {
6421         return sizeof(struct lu_fid);
6422 }
6423
6424 static int lfsck_orphan_it_rec(const struct lu_env *env,
6425                                const struct dt_it *di,
6426                                struct dt_rec *rec,
6427                                __u32 attr)
6428 {
6429         struct lfsck_orphan_it *it = (struct lfsck_orphan_it *)di;
6430
6431         *(struct lu_orphan_rec *)rec = it->loi_rec;
6432
6433         return 0;
6434 }
6435
6436 static __u64 lfsck_orphan_it_store(const struct lu_env *env,
6437                                    const struct dt_it *di)
6438 {
6439         struct lfsck_orphan_it  *it   = (struct lfsck_orphan_it *)di;
6440
6441         return it->loi_hash;
6442 }
6443
6444 /**
6445  * \retval       +1: locate to the exactly position
6446  * \retval        0: cannot locate to the exactly position,
6447  *                   call next() to move to a valid position.
6448  * \retval      -ve: on error
6449  */
6450 static int lfsck_orphan_it_load(const struct lu_env *env,
6451                                 const struct dt_it *di,
6452                                 __u64 hash)
6453 {
6454         struct lfsck_orphan_it           *it   = (struct lfsck_orphan_it *)di;
6455         struct lfsck_layout_slave_target *llst = it->loi_llst;
6456         int                               rc;
6457
6458         LASSERT(llst != NULL);
6459
6460         if (hash != llst->llst_hash) {
6461                 CDEBUG(D_LFSCK, "%s: the given hash "LPU64" for orphan "
6462                        "iteration does not match the one when fini "
6463                        LPU64", to be reset.\n",
6464                        lfsck_lfsck2name(it->loi_com->lc_lfsck), hash,
6465                        llst->llst_hash);
6466                 fid_zero(&llst->llst_fid);
6467                 llst->llst_hash = 0;
6468         }
6469
6470         it->loi_key = llst->llst_fid;
6471         it->loi_hash = llst->llst_hash;
6472         rc = lfsck_orphan_it_next(env, (struct dt_it *)di);
6473         if (rc == 1)
6474                 return 0;
6475
6476         if (rc == 0)
6477                 return 1;
6478
6479         return rc;
6480 }
6481
6482 static int lfsck_orphan_it_key_rec(const struct lu_env *env,
6483                                    const struct dt_it *di,
6484                                    void *key_rec)
6485 {
6486         return 0;
6487 }
6488
6489 const struct dt_index_operations lfsck_orphan_index_ops = {
6490         .dio_lookup             = lfsck_orphan_index_lookup,
6491         .dio_declare_insert     = lfsck_orphan_index_declare_insert,
6492         .dio_insert             = lfsck_orphan_index_insert,
6493         .dio_declare_delete     = lfsck_orphan_index_declare_delete,
6494         .dio_delete             = lfsck_orphan_index_delete,
6495         .dio_it = {
6496                 .init           = lfsck_orphan_it_init,
6497                 .fini           = lfsck_orphan_it_fini,
6498                 .get            = lfsck_orphan_it_get,
6499                 .put            = lfsck_orphan_it_put,
6500                 .next           = lfsck_orphan_it_next,
6501                 .key            = lfsck_orphan_it_key,
6502                 .key_size       = lfsck_orphan_it_key_size,
6503                 .rec            = lfsck_orphan_it_rec,
6504                 .store          = lfsck_orphan_it_store,
6505                 .load           = lfsck_orphan_it_load,
6506                 .key_rec        = lfsck_orphan_it_key_rec,
6507         }
6508 };